This is an automated email from the ASF dual-hosted git repository.
schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 4aa81204ff8 KAFKA-19018,KAFKA-19063: Implement maxRecords and
acquisition lock timeout in share fetch request and response resp. (#19334)
4aa81204ff8 is described below
commit 4aa81204ff840bbd3e07f2b472ffbafc530225e9
Author: Apoorv Mittal <[email protected]>
AuthorDate: Tue Apr 1 12:23:06 2025 +0100
KAFKA-19018,KAFKA-19063: Implement maxRecords and acquisition lock timeout
in share fetch request and response resp. (#19334)
PR add `MaxRecords` to share fetch request and also adds
`AcquisitionLockTimeout` to share fetch response. PR also removes
internal broker config of `max.fetch.records`.
Reviewers: Andrew Schofield <[email protected]>
---
.../consumer/internals/ShareSessionHandler.java | 2 +-
.../kafka/common/requests/ShareFetchRequest.java | 5 +-
.../kafka/common/requests/ShareFetchResponse.java | 9 +-
.../common/message/ShareFetchRequest.json | 4 +-
.../common/message/ShareFetchResponse.json | 2 +
.../internals/ShareConsumeRequestManagerTest.java | 52 +++---
.../kafka/server/builders/KafkaApisBuilder.java | 11 +-
.../java/kafka/server/share/ShareFetchUtils.java | 17 ++
.../java/kafka/server/share/SharePartition.java | 8 +-
.../kafka/server/share/SharePartitionManager.java | 14 +-
.../src/main/scala/kafka/server/BrokerServer.scala | 4 +-
core/src/main/scala/kafka/server/KafkaApis.scala | 11 +-
.../server/share/SharePartitionManagerTest.java | 81 +++++----
.../scala/unit/kafka/server/KafkaApisTest.scala | 38 +++--
.../scala/unit/kafka/server/KafkaConfigTest.scala | 1 -
.../server/ShareFetchAcknowledgeRequestTest.scala | 190 ++++++++++++++++++++-
.../group/modern/share/ShareGroupConfig.java | 13 --
.../metadata/KRaftMetadataRequestBenchmark.java | 3 +
.../kafka/server/share/context/FinalContext.java | 2 +-
.../server/share/context/ShareFetchContext.java | 2 +-
.../server/share/context/ShareSessionContext.java | 12 +-
21 files changed, 334 insertions(+), 147 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareSessionHandler.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareSessionHandler.java
index e88318217ad..34a109944be 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareSessionHandler.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareSessionHandler.java
@@ -180,7 +180,7 @@ public class ShareSessionHandler {
return ShareFetchRequest.Builder.forConsumer(
groupId, nextMetadata, fetchConfig.maxWaitMs,
fetchConfig.minBytes, fetchConfig.maxBytes,
fetchConfig.maxPollRecords,
- added, removed, acknowledgementBatches);
+ fetchConfig.maxPollRecords, added, removed,
acknowledgementBatches);
}
public ShareAcknowledgeRequest.Builder newShareAcknowledgeBuilder(String
groupId, FetchConfig fetchConfig) {
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/ShareFetchRequest.java
b/clients/src/main/java/org/apache/kafka/common/requests/ShareFetchRequest.java
index 7dcf311bd7f..215fb6e90c7 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/ShareFetchRequest.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/ShareFetchRequest.java
@@ -46,8 +46,8 @@ public class ShareFetchRequest extends AbstractRequest {
}
public static Builder forConsumer(String groupId, ShareRequestMetadata
metadata,
- int maxWait, int minBytes, int
maxBytes, int batchSize,
- List<TopicIdPartition> send,
List<TopicIdPartition> forget,
+ int maxWait, int minBytes, int
maxBytes, int maxRecords,
+ int batchSize,
List<TopicIdPartition> send, List<TopicIdPartition> forget,
Map<TopicIdPartition,
List<ShareFetchRequestData.AcknowledgementBatch>> acknowledgementsMap) {
ShareFetchRequestData data = new ShareFetchRequestData();
data.setGroupId(groupId);
@@ -62,6 +62,7 @@ public class ShareFetchRequest extends AbstractRequest {
data.setMaxWaitMs(maxWait);
data.setMinBytes(minBytes);
data.setMaxBytes(maxBytes);
+ data.setMaxRecords(maxRecords);
data.setBatchSize(batchSize);
// Build a map of topics to fetch keyed by topic ID, and within
each a map of partitions keyed by index
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/ShareFetchResponse.java
b/clients/src/main/java/org/apache/kafka/common/requests/ShareFetchResponse.java
index 619e740029d..0c40ea2039a 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/ShareFetchResponse.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/ShareFetchResponse.java
@@ -144,7 +144,7 @@ public class ShareFetchResponse extends AbstractResponse {
Iterator<Map.Entry<TopicIdPartition,
ShareFetchResponseData.PartitionData>> partIterator) {
// Since the throttleTimeMs and metadata field sizes are constant and
fixed, we can
// use arbitrary values here without affecting the result.
- ShareFetchResponseData data = toMessage(Errors.NONE, 0, partIterator,
Collections.emptyList());
+ ShareFetchResponseData data = toMessage(Errors.NONE, 0, partIterator,
Collections.emptyList(), 0);
ObjectSerializationCache cache = new ObjectSerializationCache();
return 4 + data.size(cache, version);
}
@@ -159,13 +159,13 @@ public class ShareFetchResponse extends AbstractResponse {
public static ShareFetchResponse of(Errors error,
int throttleTimeMs,
LinkedHashMap<TopicIdPartition,
ShareFetchResponseData.PartitionData> responseData,
- List<Node> nodeEndpoints) {
- return new ShareFetchResponse(toMessage(error, throttleTimeMs,
responseData.entrySet().iterator(), nodeEndpoints));
+ List<Node> nodeEndpoints, int
acquisitionLockTimeout) {
+ return new ShareFetchResponse(toMessage(error, throttleTimeMs,
responseData.entrySet().iterator(), nodeEndpoints, acquisitionLockTimeout));
}
public static ShareFetchResponseData toMessage(Errors error, int
throttleTimeMs,
Iterator<Map.Entry<TopicIdPartition, ShareFetchResponseData.PartitionData>>
partIterator,
- List<Node> nodeEndpoints) {
+ List<Node> nodeEndpoints,
int acquisitionLockTimeout) {
Map<Uuid, ShareFetchResponseData.ShareFetchableTopicResponse>
topicResponseList = new LinkedHashMap<>();
while (partIterator.hasNext()) {
Map.Entry<TopicIdPartition, ShareFetchResponseData.PartitionData>
entry = partIterator.next();
@@ -193,6 +193,7 @@ public class ShareFetchResponse extends AbstractResponse {
.setRack(endpoint.rack())));
return data.setThrottleTimeMs(throttleTimeMs)
.setErrorCode(error.code())
+ .setAcquisitionLockTimeoutMs(acquisitionLockTimeout)
.setResponses(new ArrayList<>(topicResponseList.values()));
}
diff --git a/clients/src/main/resources/common/message/ShareFetchRequest.json
b/clients/src/main/resources/common/message/ShareFetchRequest.json
index e85fc095861..82ac3093db2 100644
--- a/clients/src/main/resources/common/message/ShareFetchRequest.json
+++ b/clients/src/main/resources/common/message/ShareFetchRequest.json
@@ -36,7 +36,9 @@
{ "name": "MinBytes", "type": "int32", "versions": "0+",
"about": "The minimum bytes to accumulate in the response." },
{ "name": "MaxBytes", "type": "int32", "versions": "0+", "default":
"0x7fffffff",
- "about": "The maximum bytes to fetch. See KIP-74 for cases where this
limit may not be honored." },
+ "about": "The maximum bytes to fetch. See KIP-74 for cases where this
limit may not be honored." },
+ { "name": "MaxRecords", "type": "int32", "versions": "0+",
+ "about": "The maximum number of records to fetch. This limit can be
exceeded for alignment of batch boundaries." },
{ "name": "BatchSize", "type": "int32", "versions": "0+",
"about": "The optimal number of records for batches of acquired records
and acknowledgements." },
{ "name": "Topics", "type": "[]FetchTopic", "versions": "0+",
diff --git a/clients/src/main/resources/common/message/ShareFetchResponse.json
b/clients/src/main/resources/common/message/ShareFetchResponse.json
index 858b0bdd46f..ed459f304af 100644
--- a/clients/src/main/resources/common/message/ShareFetchResponse.json
+++ b/clients/src/main/resources/common/message/ShareFetchResponse.json
@@ -39,6 +39,8 @@
"about": "The top-level response error code." },
{ "name": "ErrorMessage", "type": "string", "versions": "0+",
"nullableVersions": "0+", "default": "null",
"about": "The top-level error message, or null if there was no error." },
+ { "name": "AcquisitionLockTimeoutMs", "type": "int32", "versions": "0+",
+ "about": "The time in milliseconds for which the acquired records are
locked." },
{ "name": "Responses", "type": "[]ShareFetchableTopicResponse",
"versions": "0+",
"about": "The response topics.", "fields": [
{ "name": "TopicId", "type": "uuid", "versions": "0+",
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java
index 510a1318c59..4ad0bb43db3 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java
@@ -1340,7 +1340,7 @@ public class ShareConsumeRequestManagerTest {
LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData>
partitionDataMap = new LinkedHashMap<>();
partitionDataMap.put(tip0, partitionDataForFetch(tip0, records,
acquiredRecords, Errors.NONE, Errors.NONE));
partitionDataMap.put(t2ip0, partitionDataForFetch(t2ip0, records,
acquiredRecords, Errors.NONE, Errors.NONE));
- client.prepareResponse(ShareFetchResponse.of(Errors.NONE, 0,
partitionDataMap, Collections.emptyList()));
+ client.prepareResponse(ShareFetchResponse.of(Errors.NONE, 0,
partitionDataMap, Collections.emptyList(), 0));
networkClientDelegate.poll(time.timer(0));
assertTrue(shareConsumeRequestManager.hasCompletedFetches());
@@ -1386,7 +1386,7 @@ public class ShareConsumeRequestManagerTest {
LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData>
partitionDataMap = new LinkedHashMap<>();
partitionDataMap.put(tip0, partitionDataForFetch(tip0, records,
acquiredRecords, Errors.NONE, Errors.NONE));
partitionDataMap.put(t2ip0, partitionDataForFetch(t2ip0, records,
emptyAcquiredRecords, Errors.TOPIC_AUTHORIZATION_FAILED, Errors.NONE));
- client.prepareResponse(ShareFetchResponse.of(Errors.NONE, 0,
partitionDataMap, Collections.emptyList()));
+ client.prepareResponse(ShareFetchResponse.of(Errors.NONE, 0,
partitionDataMap, Collections.emptyList(), 0));
networkClientDelegate.poll(time.timer(0));
assertTrue(shareConsumeRequestManager.hasCompletedFetches());
@@ -1415,7 +1415,7 @@ public class ShareConsumeRequestManagerTest {
LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData>
partitionDataMap = new LinkedHashMap<>();
partitionDataMap.put(t2ip0, partitionDataForFetch(t2ip0, records,
emptyAcquiredRecords, Errors.TOPIC_AUTHORIZATION_FAILED, Errors.NONE));
partitionDataMap.put(tip0, partitionDataForFetch(tip0, records,
acquiredRecords, Errors.NONE, Errors.NONE));
- client.prepareResponse(ShareFetchResponse.of(Errors.NONE, 0,
partitionDataMap, Collections.emptyList()));
+ client.prepareResponse(ShareFetchResponse.of(Errors.NONE, 0,
partitionDataMap, Collections.emptyList(), 0));
networkClientDelegate.poll(time.timer(0));
assertTrue(shareConsumeRequestManager.hasCompletedFetches());
@@ -1869,13 +1869,13 @@ public class ShareConsumeRequestManagerTest {
.setRecords(records)
.setAcquiredRecords(ShareCompletedFetchTest.acquiredRecords(1L, 1))
.setAcknowledgeErrorCode(Errors.NONE.code()));
- client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0,
partitionData, Collections.emptyList()), nodeId0);
+ client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0,
partitionData, Collections.emptyList(), 0), nodeId0);
partitionData.clear();
partitionData.put(tip1,
new ShareFetchResponseData.PartitionData()
.setPartitionIndex(tip1.topicPartition().partition())
.setErrorCode(error.code()));
- client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0,
partitionData, Collections.emptyList()), nodeId1);
+ client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0,
partitionData, Collections.emptyList(), 0), nodeId1);
networkClientDelegate.poll(time.timer(0));
assertTrue(shareConsumeRequestManager.hasCompletedFetches());
@@ -1921,7 +1921,7 @@ public class ShareConsumeRequestManagerTest {
.setRecords(records)
.setAcquiredRecords(ShareCompletedFetchTest.acquiredRecords(1L, 1))
.setAcknowledgeErrorCode(Errors.NONE.code()));
- client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0,
partitionData, Collections.emptyList()), nodeId0);
+ client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0,
partitionData, Collections.emptyList(), 0), nodeId0);
networkClientDelegate.poll(time.timer(0));
assertTrue(shareConsumeRequestManager.hasCompletedFetches());
@@ -1971,7 +1971,7 @@ public class ShareConsumeRequestManagerTest {
.setRecords(records)
.setAcquiredRecords(ShareCompletedFetchTest.acquiredRecords(1L, 1))
.setAcknowledgeErrorCode(Errors.NONE.code()));
- client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0,
partitionData, Collections.emptyList()), nodeId0);
+ client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0,
partitionData, Collections.emptyList(), 0), nodeId0);
partitionData.clear();
partitionData.put(tip1,
new ShareFetchResponseData.PartitionData()
@@ -1980,7 +1980,7 @@ public class ShareConsumeRequestManagerTest {
.setCurrentLeader(new ShareFetchResponseData.LeaderIdAndEpoch()
.setLeaderId(tp0Leader.id())
.setLeaderEpoch(validLeaderEpoch + 1)));
- client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0,
partitionData, singletonList(tp0Leader)), nodeId1);
+ client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0,
partitionData, singletonList(tp0Leader), 0), nodeId1);
networkClientDelegate.poll(time.timer(0));
assertTrue(shareConsumeRequestManager.hasCompletedFetches());
@@ -2019,7 +2019,7 @@ public class ShareConsumeRequestManagerTest {
.setRecords(records)
.setAcquiredRecords(ShareCompletedFetchTest.acquiredRecords(1L, 1))
.setAcknowledgeErrorCode(Errors.NONE.code()));
- client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0,
partitionData, Collections.emptyList()), nodeId0);
+ client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0,
partitionData, Collections.emptyList(), 0), nodeId0);
networkClientDelegate.poll(time.timer(0));
assertTrue(shareConsumeRequestManager.hasCompletedFetches());
@@ -2069,13 +2069,13 @@ public class ShareConsumeRequestManagerTest {
.setRecords(records)
.setAcquiredRecords(ShareCompletedFetchTest.acquiredRecords(1L, 1))
.setAcknowledgeErrorCode(Errors.NONE.code()));
- client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0,
partitionData, Collections.emptyList()), nodeId0);
+ client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0,
partitionData, Collections.emptyList(), 0), nodeId0);
partitionData.clear();
partitionData.put(tip1,
new ShareFetchResponseData.PartitionData()
.setPartitionIndex(tip1.topicPartition().partition())
.setErrorCode(Errors.NONE.code()));
- client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0,
partitionData, Collections.emptyList()), nodeId1);
+ client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0,
partitionData, Collections.emptyList(), 0), nodeId1);
networkClientDelegate.poll(time.timer(0));
assertTrue(shareConsumeRequestManager.hasCompletedFetches());
@@ -2112,7 +2112,7 @@ public class ShareConsumeRequestManagerTest {
.setPartitionIndex(tip0.topicPartition().partition())
.setErrorCode(Errors.NONE.code())
.setAcknowledgeErrorCode(error.code()));
- client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0,
partitionData, Collections.emptyList()), nodeId0);
+ client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0,
partitionData, Collections.emptyList(), 0), nodeId0);
partitionData.clear();
partitionData.put(tip0,
new ShareFetchResponseData.PartitionData()
@@ -2127,7 +2127,7 @@ public class ShareConsumeRequestManagerTest {
.setRecords(records)
.setAcquiredRecords(ShareCompletedFetchTest.acquiredRecords(1L, 1))
.setAcknowledgeErrorCode(Errors.NONE.code()));
- client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0,
partitionData, Collections.emptyList()), nodeId1);
+ client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0,
partitionData, Collections.emptyList(), 0), nodeId1);
networkClientDelegate.poll(time.timer(0));
assertTrue(shareConsumeRequestManager.hasCompletedFetches());
@@ -2172,7 +2172,7 @@ public class ShareConsumeRequestManagerTest {
.setRecords(records)
.setAcquiredRecords(ShareCompletedFetchTest.acquiredRecords(1L, 1))
.setAcknowledgeErrorCode(Errors.NONE.code()));
- client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0,
partitionData, Collections.emptyList()), nodeId0);
+ client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0,
partitionData, Collections.emptyList(), 0), nodeId0);
partitionData.clear();
partitionData.put(tip1,
new ShareFetchResponseData.PartitionData()
@@ -2181,7 +2181,7 @@ public class ShareConsumeRequestManagerTest {
.setRecords(records)
.setAcquiredRecords(ShareCompletedFetchTest.acquiredRecords(1L, 2))
.setAcknowledgeErrorCode(Errors.NONE.code()));
- client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0,
partitionData, Collections.emptyList()), nodeId1);
+ client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0,
partitionData, Collections.emptyList(), 0), nodeId1);
networkClientDelegate.poll(time.timer(0));
assertTrue(shareConsumeRequestManager.hasCompletedFetches());
@@ -2258,7 +2258,7 @@ public class ShareConsumeRequestManagerTest {
.setRecords(records)
.setAcquiredRecords(ShareCompletedFetchTest.acquiredRecords(1L, 1))
.setAcknowledgeErrorCode(Errors.NONE.code()));
- client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0,
partitionData, Collections.emptyList()), nodeId0);
+ client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0,
partitionData, Collections.emptyList(), 0), nodeId0);
partitionData.clear();
partitionData.put(tip1,
new ShareFetchResponseData.PartitionData()
@@ -2267,7 +2267,7 @@ public class ShareConsumeRequestManagerTest {
.setRecords(records)
.setAcquiredRecords(ShareCompletedFetchTest.acquiredRecords(1L, 2))
.setAcknowledgeErrorCode(Errors.NONE.code()));
- client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0,
partitionData, Collections.emptyList()), nodeId1);
+ client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0,
partitionData, Collections.emptyList(), 0), nodeId1);
networkClientDelegate.poll(time.timer(0));
assertTrue(shareConsumeRequestManager.hasCompletedFetches());
@@ -2348,7 +2348,7 @@ public class ShareConsumeRequestManagerTest {
.setRecords(records)
.setAcquiredRecords(ShareCompletedFetchTest.acquiredRecords(1L, 1))
.setAcknowledgeErrorCode(Errors.NONE.code()));
- client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0,
partitionData, Collections.emptyList()), nodeId0);
+ client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0,
partitionData, Collections.emptyList(), 0), nodeId0);
partitionData.clear();
partitionData.put(tip1,
new ShareFetchResponseData.PartitionData()
@@ -2357,7 +2357,7 @@ public class ShareConsumeRequestManagerTest {
.setRecords(records)
.setAcquiredRecords(ShareCompletedFetchTest.acquiredRecords(1L, 2))
.setAcknowledgeErrorCode(Errors.NONE.code()));
- client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0,
partitionData, Collections.emptyList()), nodeId1);
+ client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0,
partitionData, Collections.emptyList(), 0), nodeId1);
networkClientDelegate.poll(time.timer(0));
assertTrue(shareConsumeRequestManager.hasCompletedFetches());
@@ -2441,13 +2441,13 @@ public class ShareConsumeRequestManagerTest {
.setRecords(records)
.setAcquiredRecords(ShareCompletedFetchTest.acquiredRecords(1L, 1))
.setAcknowledgeErrorCode(Errors.NONE.code()));
- client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0,
partitionData, Collections.emptyList()), nodeId0);
+ client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0,
partitionData, Collections.emptyList(), 0), nodeId0);
partitionData.clear();
partitionData.put(tip1,
new ShareFetchResponseData.PartitionData()
.setPartitionIndex(tip1.topicPartition().partition())
.setErrorCode(Errors.NONE.code()));
- client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0,
partitionData, Collections.emptyList()), nodeId1);
+ client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0,
partitionData, Collections.emptyList(), 0), nodeId1);
networkClientDelegate.poll(time.timer(0));
assertTrue(shareConsumeRequestManager.hasCompletedFetches());
@@ -2477,7 +2477,7 @@ public class ShareConsumeRequestManagerTest {
.setPartitionIndex(tip0.topicPartition().partition())
.setErrorCode(Errors.NONE.code())
.setAcknowledgeErrorCode(Errors.NONE.code()));
- client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0,
partitionData, Collections.emptyList()), nodeId0, true);
+ client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0,
partitionData, Collections.emptyList(), 0), nodeId0, true);
partitionData.clear();
partitionData.put(tip1,
new ShareFetchResponseData.PartitionData()
@@ -2485,7 +2485,7 @@ public class ShareConsumeRequestManagerTest {
.setRecords(records)
.setAcquiredRecords(ShareCompletedFetchTest.acquiredRecords(1L, 1))
.setAcknowledgeErrorCode(Errors.NONE.code()));
- client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0,
partitionData, Collections.emptyList()), nodeId1);
+ client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0,
partitionData, Collections.emptyList(), 0), nodeId1);
networkClientDelegate.poll(time.timer(0));
assertTrue(shareConsumeRequestManager.hasCompletedFetches());
@@ -2523,7 +2523,7 @@ public class ShareConsumeRequestManagerTest {
partitionData.put(tip1,
new ShareFetchResponseData.PartitionData()
.setPartitionIndex(tip1.topicPartition().partition()));
- client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0,
partitionData, Collections.emptyList()), nodeId1);
+ client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0,
partitionData, Collections.emptyList(), 0), nodeId1);
networkClientDelegate.poll(time.timer(0));
assertTrue(shareConsumeRequestManager.hasCompletedFetches());
@@ -2542,7 +2542,7 @@ public class ShareConsumeRequestManagerTest {
new ShareFetchResponseData.PartitionData()
.setPartitionIndex(tp.topicPartition().partition())
.setErrorCode(error.code()));
- return ShareFetchResponse.of(error, 0, new
LinkedHashMap<>(partitions), Collections.emptyList());
+ return ShareFetchResponse.of(error, 0, new
LinkedHashMap<>(partitions), Collections.emptyList(), 0);
}
private ShareFetchResponse fullFetchResponse(TopicIdPartition tp,
@@ -2559,7 +2559,7 @@ public class ShareConsumeRequestManagerTest {
Errors acknowledgeError) {
Map<TopicIdPartition, ShareFetchResponseData.PartitionData> partitions
= Map.of(tp,
partitionDataForFetch(tp, records, acquiredRecords, error,
acknowledgeError));
- return ShareFetchResponse.of(Errors.NONE, 0, new
LinkedHashMap<>(partitions), Collections.emptyList());
+ return ShareFetchResponse.of(Errors.NONE, 0, new
LinkedHashMap<>(partitions), Collections.emptyList(), 0);
}
private ShareAcknowledgeResponse emptyAcknowledgeResponse() {
diff --git a/core/src/main/java/kafka/server/builders/KafkaApisBuilder.java
b/core/src/main/java/kafka/server/builders/KafkaApisBuilder.java
index 52b9eb93068..fa4997bbf08 100644
--- a/core/src/main/java/kafka/server/builders/KafkaApisBuilder.java
+++ b/core/src/main/java/kafka/server/builders/KafkaApisBuilder.java
@@ -32,6 +32,7 @@ import kafka.server.share.SharePartitionManager;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.coordinator.group.GroupConfigManager;
import org.apache.kafka.coordinator.group.GroupCoordinator;
import org.apache.kafka.coordinator.share.ShareCoordinator;
import org.apache.kafka.metadata.ConfigRepository;
@@ -68,6 +69,7 @@ public class KafkaApisBuilder {
private ApiVersionManager apiVersionManager = null;
private ClientMetricsManager clientMetricsManager = null;
private Optional<ShareCoordinator> shareCoordinator = Optional.empty();
+ private GroupConfigManager groupConfigManager = null;
public KafkaApisBuilder setRequestChannel(RequestChannel requestChannel) {
this.requestChannel = requestChannel;
@@ -179,6 +181,11 @@ public class KafkaApisBuilder {
return this;
}
+ public KafkaApisBuilder setGroupConfigManager(GroupConfigManager
groupConfigManager) {
+ this.groupConfigManager = groupConfigManager;
+ return this;
+ }
+
@SuppressWarnings({"CyclomaticComplexity"})
public KafkaApis build() {
if (requestChannel == null) throw new RuntimeException("you must set
requestChannel");
@@ -198,6 +205,7 @@ public class KafkaApisBuilder {
if (clientMetricsManager == null) throw new RuntimeException("You must
set clientMetricsManager");
if (brokerTopicStats == null) brokerTopicStats = new
BrokerTopicStats(config.remoteLogManagerConfig().isRemoteStorageSystemEnabled());
if (apiVersionManager == null) throw new RuntimeException("You must
set apiVersionManager");
+ if (groupConfigManager == null) throw new RuntimeException("You must
set groupConfigManager");
return new KafkaApis(requestChannel,
forwardingManager,
@@ -220,6 +228,7 @@ public class KafkaApisBuilder {
time,
tokenManager,
apiVersionManager,
- clientMetricsManager);
+ clientMetricsManager,
+ groupConfigManager);
}
}
diff --git a/core/src/main/java/kafka/server/share/ShareFetchUtils.java
b/core/src/main/java/kafka/server/share/ShareFetchUtils.java
index da4f8ef6668..070b972d110 100644
--- a/core/src/main/java/kafka/server/share/ShareFetchUtils.java
+++ b/core/src/main/java/kafka/server/share/ShareFetchUtils.java
@@ -31,6 +31,7 @@ import
org.apache.kafka.common.record.FileLogInputStream.FileChannelRecordBatch;
import org.apache.kafka.common.record.FileRecords;
import org.apache.kafka.common.record.Records;
import org.apache.kafka.common.requests.ListOffsetsRequest;
+import org.apache.kafka.coordinator.group.GroupConfigManager;
import org.apache.kafka.server.share.SharePartitionKey;
import org.apache.kafka.server.share.fetch.ShareAcquiredRecords;
import org.apache.kafka.server.share.fetch.ShareFetch;
@@ -258,4 +259,20 @@ public class ShareFetchUtils {
return records;
}
}
+
+ /**
+ * The method is used to get the record lock duration for the group. If
the group config is present,
+ * then the record lock duration is returned. Otherwise, the default value
is returned.
+ *
+ * @param groupConfigManager The group config manager.
+ * @param groupId The group id for which the record lock duration is to be
fetched.
+ * @param defaultValue The default value to be returned if the group
config is not present.
+ * @return The record lock duration for the group.
+ */
+ public static int recordLockDurationMsOrDefault(GroupConfigManager
groupConfigManager, String groupId, int defaultValue) {
+ if (groupConfigManager.groupConfig(groupId).isPresent()) {
+ return
groupConfigManager.groupConfig(groupId).get().shareRecordLockDurationMs();
+ }
+ return defaultValue;
+ }
}
diff --git a/core/src/main/java/kafka/server/share/SharePartition.java
b/core/src/main/java/kafka/server/share/SharePartition.java
index 619469c8c9a..34966022d32 100644
--- a/core/src/main/java/kafka/server/share/SharePartition.java
+++ b/core/src/main/java/kafka/server/share/SharePartition.java
@@ -79,6 +79,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import static kafka.server.share.ShareFetchUtils.offsetForEarliestTimestamp;
import static kafka.server.share.ShareFetchUtils.offsetForLatestTimestamp;
import static kafka.server.share.ShareFetchUtils.offsetForTimestamp;
+import static kafka.server.share.ShareFetchUtils.recordLockDurationMsOrDefault;
/**
* The SharePartition is used to track the state of a partition that is shared
between multiple
@@ -2302,12 +2303,7 @@ public class SharePartition {
// The recordLockDuration value would depend on whether the dynamic
config SHARE_RECORD_LOCK_DURATION_MS in
// GroupConfig.java is set or not. If dynamic config is set, then that
is used, otherwise the value of
// SHARE_GROUP_RECORD_LOCK_DURATION_MS_CONFIG defined in
ShareGroupConfig is used
- int recordLockDurationMs;
- if (groupConfigManager.groupConfig(groupId).isPresent()) {
- recordLockDurationMs =
groupConfigManager.groupConfig(groupId).get().shareRecordLockDurationMs();
- } else {
- recordLockDurationMs = defaultRecordLockDurationMs;
- }
+ int recordLockDurationMs =
recordLockDurationMsOrDefault(groupConfigManager, groupId,
defaultRecordLockDurationMs);
return scheduleAcquisitionLockTimeout(memberId, firstOffset,
lastOffset, recordLockDurationMs);
}
diff --git a/core/src/main/java/kafka/server/share/SharePartitionManager.java
b/core/src/main/java/kafka/server/share/SharePartitionManager.java
index 4a8c373fa69..42081241fbc 100644
--- a/core/src/main/java/kafka/server/share/SharePartitionManager.java
+++ b/core/src/main/java/kafka/server/share/SharePartitionManager.java
@@ -145,11 +145,6 @@ public class SharePartitionManager implements
AutoCloseable {
*/
private final BrokerTopicStats brokerTopicStats;
- /**
- * The max fetch records is the maximum number of records that can be
fetched by a share fetch request.
- */
- private final int maxFetchRecords;
-
public SharePartitionManager(
ReplicaManager replicaManager,
Time time,
@@ -157,7 +152,6 @@ public class SharePartitionManager implements AutoCloseable
{
int defaultRecordLockDurationMs,
int maxDeliveryCount,
int maxInFlightMessages,
- int maxFetchRecords,
Persister persister,
GroupConfigManager groupConfigManager,
BrokerTopicStats brokerTopicStats
@@ -169,7 +163,6 @@ public class SharePartitionManager implements AutoCloseable
{
defaultRecordLockDurationMs,
maxDeliveryCount,
maxInFlightMessages,
- maxFetchRecords,
persister,
groupConfigManager,
new ShareGroupMetrics(time),
@@ -185,7 +178,6 @@ public class SharePartitionManager implements AutoCloseable
{
int defaultRecordLockDurationMs,
int maxDeliveryCount,
int maxInFlightMessages,
- int maxFetchRecords,
Persister persister,
GroupConfigManager groupConfigManager,
ShareGroupMetrics shareGroupMetrics,
@@ -200,7 +192,6 @@ public class SharePartitionManager implements AutoCloseable
{
new SystemTimer("share-group-lock-timeout")),
maxDeliveryCount,
maxInFlightMessages,
- maxFetchRecords,
persister,
groupConfigManager,
shareGroupMetrics,
@@ -218,7 +209,6 @@ public class SharePartitionManager implements AutoCloseable
{
Timer timer,
int maxDeliveryCount,
int maxInFlightMessages,
- int maxFetchRecords,
Persister persister,
GroupConfigManager groupConfigManager,
ShareGroupMetrics shareGroupMetrics,
@@ -235,7 +225,6 @@ public class SharePartitionManager implements AutoCloseable
{
this.persister = persister;
this.groupConfigManager = groupConfigManager;
this.shareGroupMetrics = shareGroupMetrics;
- this.maxFetchRecords = maxFetchRecords;
this.brokerTopicStats = brokerTopicStats;
}
@@ -246,6 +235,8 @@ public class SharePartitionManager implements AutoCloseable
{
* @param groupId The group id, this is used to identify the share group.
* @param memberId The member id, generated by the group-coordinator, this
is used to identify the client.
* @param fetchParams The fetch parameters from the share fetch request.
+ * @param sessionEpoch The session epoch for the member.
+ * @param maxFetchRecords The maximum number of records to fetch.
* @param batchSize The number of records per acquired records batch.
* @param topicIdPartitions The topic partitions to fetch for.
*
@@ -256,6 +247,7 @@ public class SharePartitionManager implements AutoCloseable
{
String memberId,
FetchParams fetchParams,
int sessionEpoch,
+ int maxFetchRecords,
int batchSize,
List<TopicIdPartition> topicIdPartitions
) {
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala
b/core/src/main/scala/kafka/server/BrokerServer.scala
index ace63454f93..0823ec683f7 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -438,7 +438,6 @@ class BrokerServer(
config.shareGroupConfig.shareGroupRecordLockDurationMs,
config.shareGroupConfig.shareGroupDeliveryCountLimit,
config.shareGroupConfig.shareGroupPartitionMaxRecordLocks,
- config.shareGroupConfig.shareFetchMaxFetchRecords,
persister,
groupConfigManager,
brokerTopicStats
@@ -466,7 +465,8 @@ class BrokerServer(
time = time,
tokenManager = tokenManager,
apiVersionManager = apiVersionManager,
- clientMetricsManager = clientMetricsManager)
+ clientMetricsManager = clientMetricsManager,
+ groupConfigManager = groupConfigManager)
dataPlaneRequestHandlerPool = new KafkaRequestHandlerPool(config.nodeId,
socketServer.dataPlaneRequestChannel, dataPlaneRequestProcessor, time,
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala
b/core/src/main/scala/kafka/server/KafkaApis.scala
index af851f3ed84..6de51d07332 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -21,7 +21,7 @@ import kafka.coordinator.transaction.{InitProducerIdResult,
TransactionCoordinat
import kafka.network.RequestChannel
import kafka.server.QuotaFactory.{QuotaManagers, UNBOUNDED_QUOTA}
import kafka.server.handlers.DescribeTopicPartitionsRequestHandler
-import kafka.server.share.SharePartitionManager
+import kafka.server.share.{ShareFetchUtils, SharePartitionManager}
import kafka.utils.Logging
import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.clients.admin.EndpointType
@@ -55,7 +55,7 @@ import org.apache.kafka.common.security.auth.{KafkaPrincipal,
SecurityProtocol}
import org.apache.kafka.common.security.token.delegation.{DelegationToken,
TokenInformation}
import org.apache.kafka.common.utils.{ProducerIdAndEpoch, Time}
import org.apache.kafka.common.{Node, TopicIdPartition, TopicPartition, Uuid}
-import org.apache.kafka.coordinator.group.{Group, GroupCoordinator}
+import org.apache.kafka.coordinator.group.{Group, GroupConfigManager,
GroupCoordinator}
import org.apache.kafka.coordinator.share.ShareCoordinator
import org.apache.kafka.metadata.{ConfigRepository, MetadataCache}
import org.apache.kafka.server.ClientMetricsManager
@@ -104,7 +104,8 @@ class KafkaApis(val requestChannel: RequestChannel,
time: Time,
val tokenManager: DelegationTokenManager,
val apiVersionManager: ApiVersionManager,
- val clientMetricsManager: ClientMetricsManager
+ val clientMetricsManager: ClientMetricsManager,
+ val groupConfigManager: GroupConfigManager
) extends ApiRequestHandler with Logging {
type FetchResponseStats = Map[TopicPartition, RecordValidationStats]
@@ -3215,6 +3216,7 @@ class KafkaApis(val requestChannel: RequestChannel,
shareFetchRequest.data.memberId,
params,
shareSessionEpoch,
+ shareFetchRequest.data.maxRecords,
shareFetchRequest.data.batchSize,
interestedTopicPartitions
).thenApply{ result =>
@@ -3749,7 +3751,8 @@ class KafkaApis(val requestChannel: RequestChannel,
// Prepare share fetch response
val response =
- ShareFetchResponse.of(shareFetchResponse.error, throttleTimeMs,
responseData, nodeEndpoints.values.toList.asJava)
+ ShareFetchResponse.of(shareFetchResponse.error, throttleTimeMs,
responseData, nodeEndpoints.values.toList.asJava,
+ ShareFetchUtils.recordLockDurationMsOrDefault(groupConfigManager,
groupId, config.shareGroupConfig.shareGroupRecordLockDurationMs))
// record the bytes out metrics only when the response is being sent.
response.data.responses.forEach { topicResponse =>
topicResponse.partitions.forEach { data =>
diff --git
a/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
b/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
index c00897eee25..b02fafe4ce4 100644
--- a/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
+++ b/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
@@ -1044,19 +1044,19 @@ public class SharePartitionManagerTest {
doAnswer(invocation ->
buildLogReadResult(topicIdPartitions)).when(mockReplicaManager).readFromLog(any(),
any(), any(ReplicaQuota.class), anyBoolean());
CompletableFuture<Map<TopicIdPartition, PartitionData>> future =
sharePartitionManager.fetchMessages(
- groupId, memberId1.toString(), FETCH_PARAMS, 1, BATCH_SIZE,
topicIdPartitions);
+ groupId, memberId1.toString(), FETCH_PARAMS, 1, MAX_FETCH_RECORDS,
BATCH_SIZE, topicIdPartitions);
assertTrue(future.isDone());
Mockito.verify(mockReplicaManager, times(1)).readFromLog(
any(), any(), any(ReplicaQuota.class), anyBoolean());
- future = sharePartitionManager.fetchMessages(groupId,
memberId1.toString(), FETCH_PARAMS, 3, BATCH_SIZE,
- topicIdPartitions);
+ future = sharePartitionManager.fetchMessages(groupId,
memberId1.toString(), FETCH_PARAMS, 3,
+ MAX_FETCH_RECORDS, BATCH_SIZE, topicIdPartitions);
assertTrue(future.isDone());
Mockito.verify(mockReplicaManager, times(2)).readFromLog(
any(), any(), any(ReplicaQuota.class), anyBoolean());
- future = sharePartitionManager.fetchMessages(groupId,
memberId1.toString(), FETCH_PARAMS, 10, BATCH_SIZE,
- topicIdPartitions);
+ future = sharePartitionManager.fetchMessages(groupId,
memberId1.toString(), FETCH_PARAMS, 10,
+ MAX_FETCH_RECORDS, BATCH_SIZE, topicIdPartitions);
assertTrue(future.isDone());
Mockito.verify(mockReplicaManager, times(3)).readFromLog(
any(), any(), any(ReplicaQuota.class), anyBoolean());
@@ -1150,7 +1150,7 @@ public class SharePartitionManagerTest {
for (int i = 0; i != threadCount; ++i) {
executorService.submit(() -> {
sharePartitionManager.fetchMessages(groupId,
memberId1.toString(), FETCH_PARAMS, 0,
- BATCH_SIZE, topicIdPartitions);
+ MAX_FETCH_RECORDS, BATCH_SIZE, topicIdPartitions);
});
// We are blocking the main thread at an interval of 10
threads so that the currently running executorService threads can complete.
if (i % 10 == 0)
@@ -1197,7 +1197,7 @@ public class SharePartitionManagerTest {
CompletableFuture<Map<TopicIdPartition,
ShareFetchResponseData.PartitionData>> future =
sharePartitionManager.fetchMessages(groupId, memberId.toString(),
FETCH_PARAMS, 0,
- BATCH_SIZE, topicIdPartitions);
+ MAX_FETCH_RECORDS, BATCH_SIZE, topicIdPartitions);
Mockito.verify(mockReplicaManager, times(0)).readFromLog(
any(), any(), any(ReplicaQuota.class), anyBoolean());
Map<TopicIdPartition, ShareFetchResponseData.PartitionData> result =
future.join();
@@ -1235,8 +1235,8 @@ public class SharePartitionManagerTest {
doAnswer(invocation ->
buildLogReadResult(topicIdPartitions)).when(mockReplicaManager).readFromLog(any(),
any(), any(ReplicaQuota.class), anyBoolean());
- sharePartitionManager.fetchMessages(groupId, memberId.toString(),
FETCH_PARAMS, 0, BATCH_SIZE,
- topicIdPartitions);
+ sharePartitionManager.fetchMessages(groupId, memberId.toString(),
FETCH_PARAMS, 0,
+ MAX_FETCH_RECORDS, BATCH_SIZE, topicIdPartitions);
// Since the nextFetchOffset does not point to endOffset + 1, i.e.
some of the records in the cachedState are AVAILABLE,
// even though the maxInFlightMessages limit is exceeded,
replicaManager.readFromLog should be called
Mockito.verify(mockReplicaManager, times(1)).readFromLog(
@@ -2151,7 +2151,7 @@ public class SharePartitionManagerTest {
CompletableFuture<Map<TopicIdPartition,
ShareFetchResponseData.PartitionData>> future =
sharePartitionManager.fetchMessages(groupId, memberId.toString(),
FETCH_PARAMS, 0,
- BATCH_SIZE, topicIdPartitions);
+ MAX_FETCH_RECORDS, BATCH_SIZE, topicIdPartitions);
// Verify that the fetch request is completed.
TestUtils.waitForCondition(
future::isDone,
@@ -2221,7 +2221,7 @@ public class SharePartitionManagerTest {
CompletableFuture<Map<TopicIdPartition,
ShareFetchResponseData.PartitionData>> future =
sharePartitionManager.fetchMessages(groupId,
Uuid.randomUuid().toString(), FETCH_PARAMS, 0,
- BATCH_SIZE, topicIdPartitions);
+ MAX_FETCH_RECORDS, BATCH_SIZE, topicIdPartitions);
// Verify that the fetch request is completed.
TestUtils.waitForCondition(
future::isDone,
@@ -2283,15 +2283,15 @@ public class SharePartitionManagerTest {
// Send 3 requests for share fetch for same share partition.
CompletableFuture<Map<TopicIdPartition,
ShareFetchResponseData.PartitionData>> future1 =
sharePartitionManager.fetchMessages(groupId, memberId.toString(),
FETCH_PARAMS, 0,
- BATCH_SIZE, topicIdPartitions);
+ MAX_FETCH_RECORDS, BATCH_SIZE, topicIdPartitions);
CompletableFuture<Map<TopicIdPartition,
ShareFetchResponseData.PartitionData>> future2 =
sharePartitionManager.fetchMessages(groupId, memberId.toString(),
FETCH_PARAMS, 0,
- BATCH_SIZE, topicIdPartitions);
+ MAX_FETCH_RECORDS, BATCH_SIZE, topicIdPartitions);
CompletableFuture<Map<TopicIdPartition,
ShareFetchResponseData.PartitionData>> future3 =
sharePartitionManager.fetchMessages(groupId, memberId.toString(),
FETCH_PARAMS, 0,
- BATCH_SIZE, topicIdPartitions);
+ MAX_FETCH_RECORDS, BATCH_SIZE, topicIdPartitions);
Mockito.verify(sp0, times(3)).maybeInitialize();
Mockito.verify(mockReplicaManager,
times(3)).addDelayedShareFetchRequest(any(), any());
@@ -2350,7 +2350,7 @@ public class SharePartitionManagerTest {
when(sp0.maybeInitialize()).thenReturn(FutureUtils.failedFuture(new
LeaderNotAvailableException("Leader not available")));
CompletableFuture<Map<TopicIdPartition,
ShareFetchResponseData.PartitionData>> future =
sharePartitionManager.fetchMessages(groupId, memberId.toString(),
FETCH_PARAMS, 0,
- BATCH_SIZE, topicIdPartitions);
+ MAX_FETCH_RECORDS, BATCH_SIZE, topicIdPartitions);
TestUtils.waitForCondition(
future::isDone,
DELAYED_SHARE_FETCH_TIMEOUT_MS,
@@ -2366,7 +2366,7 @@ public class SharePartitionManagerTest {
// Return IllegalStateException to simulate initialization failure.
when(sp0.maybeInitialize()).thenReturn(FutureUtils.failedFuture(new
IllegalStateException("Illegal state")));
future = sharePartitionManager.fetchMessages(groupId,
memberId.toString(), FETCH_PARAMS, 0,
- BATCH_SIZE, topicIdPartitions);
+ MAX_FETCH_RECORDS, BATCH_SIZE, topicIdPartitions);
TestUtils.waitForCondition(
future::isDone,
DELAYED_SHARE_FETCH_TIMEOUT_MS,
@@ -2380,7 +2380,7 @@ public class SharePartitionManagerTest {
// Return CoordinatorNotAvailableException to simulate initialization
failure.
when(sp0.maybeInitialize()).thenReturn(FutureUtils.failedFuture(new
CoordinatorNotAvailableException("Coordinator not available")));
future = sharePartitionManager.fetchMessages(groupId,
memberId.toString(), FETCH_PARAMS, 0,
- BATCH_SIZE, topicIdPartitions);
+ MAX_FETCH_RECORDS, BATCH_SIZE, topicIdPartitions);
TestUtils.waitForCondition(
future::isDone,
DELAYED_SHARE_FETCH_TIMEOUT_MS,
@@ -2394,7 +2394,7 @@ public class SharePartitionManagerTest {
// Return InvalidRequestException to simulate initialization failure.
when(sp0.maybeInitialize()).thenReturn(FutureUtils.failedFuture(new
InvalidRequestException("Invalid request")));
future = sharePartitionManager.fetchMessages(groupId,
memberId.toString(), FETCH_PARAMS, 0,
- BATCH_SIZE, topicIdPartitions);
+ MAX_FETCH_RECORDS, BATCH_SIZE, topicIdPartitions);
TestUtils.waitForCondition(
future::isDone,
DELAYED_SHARE_FETCH_TIMEOUT_MS,
@@ -2408,7 +2408,7 @@ public class SharePartitionManagerTest {
// Return FencedStateEpochException to simulate initialization failure.
when(sp0.maybeInitialize()).thenReturn(FutureUtils.failedFuture(new
FencedStateEpochException("Fenced state epoch")));
future = sharePartitionManager.fetchMessages(groupId,
memberId.toString(), FETCH_PARAMS, 0,
- BATCH_SIZE, topicIdPartitions);
+ MAX_FETCH_RECORDS, BATCH_SIZE, topicIdPartitions);
TestUtils.waitForCondition(
future::isDone,
DELAYED_SHARE_FETCH_TIMEOUT_MS,
@@ -2422,7 +2422,7 @@ public class SharePartitionManagerTest {
// Return NotLeaderOrFollowerException to simulate initialization
failure.
when(sp0.maybeInitialize()).thenReturn(FutureUtils.failedFuture(new
NotLeaderOrFollowerException("Not leader or follower")));
future = sharePartitionManager.fetchMessages(groupId,
memberId.toString(), FETCH_PARAMS, 0,
- BATCH_SIZE, topicIdPartitions);
+ MAX_FETCH_RECORDS, BATCH_SIZE, topicIdPartitions);
TestUtils.waitForCondition(
future::isDone,
DELAYED_SHARE_FETCH_TIMEOUT_MS,
@@ -2436,7 +2436,7 @@ public class SharePartitionManagerTest {
// Return RuntimeException to simulate initialization failure.
when(sp0.maybeInitialize()).thenReturn(FutureUtils.failedFuture(new
RuntimeException("Runtime exception")));
future = sharePartitionManager.fetchMessages(groupId,
memberId.toString(), FETCH_PARAMS, 0,
- BATCH_SIZE, topicIdPartitions);
+ MAX_FETCH_RECORDS, BATCH_SIZE, topicIdPartitions);
TestUtils.waitForCondition(
future::isDone,
DELAYED_SHARE_FETCH_TIMEOUT_MS,
@@ -2472,7 +2472,7 @@ public class SharePartitionManagerTest {
CompletableFuture<Map<TopicIdPartition,
ShareFetchResponseData.PartitionData>> future =
sharePartitionManager.fetchMessages(groupId,
Uuid.randomUuid().toString(), FETCH_PARAMS, 0,
- BATCH_SIZE, topicIdPartitions);
+ MAX_FETCH_RECORDS, BATCH_SIZE, topicIdPartitions);
TestUtils.waitForCondition(
future::isDone,
DELAYED_SHARE_FETCH_TIMEOUT_MS,
@@ -2513,7 +2513,7 @@ public class SharePartitionManagerTest {
// Validate when exception is thrown.
CompletableFuture<Map<TopicIdPartition,
ShareFetchResponseData.PartitionData>> future =
sharePartitionManager.fetchMessages(groupId,
Uuid.randomUuid().toString(), FETCH_PARAMS, 0,
- BATCH_SIZE, topicIdPartitions);
+ MAX_FETCH_RECORDS, BATCH_SIZE, topicIdPartitions);
TestUtils.waitForCondition(
future::isDone,
DELAYED_SHARE_FETCH_TIMEOUT_MS,
@@ -2523,7 +2523,7 @@ public class SharePartitionManagerTest {
// Validate when partition is not leader.
future = sharePartitionManager.fetchMessages(groupId,
Uuid.randomUuid().toString(), FETCH_PARAMS, 0,
- BATCH_SIZE, topicIdPartitions);
+ MAX_FETCH_RECORDS, BATCH_SIZE, topicIdPartitions);
TestUtils.waitForCondition(
future::isDone,
DELAYED_SHARE_FETCH_TIMEOUT_MS,
@@ -2592,7 +2592,7 @@ public class SharePartitionManagerTest {
// Validate when exception is thrown.
CompletableFuture<Map<TopicIdPartition,
ShareFetchResponseData.PartitionData>> future =
sharePartitionManager.fetchMessages(groupId,
Uuid.randomUuid().toString(), FETCH_PARAMS, 0,
- BATCH_SIZE, topicIdPartitions);
+ MAX_FETCH_RECORDS, BATCH_SIZE, topicIdPartitions);
assertTrue(future.isDone());
assertFalse(future.isCompletedExceptionally());
@@ -2649,7 +2649,7 @@ public class SharePartitionManagerTest {
CompletableFuture<Map<TopicIdPartition,
ShareFetchResponseData.PartitionData>> future =
sharePartitionManager.fetchMessages(groupId, memberId.toString(),
FETCH_PARAMS, 0,
- BATCH_SIZE, topicIdPartitions);
+ MAX_FETCH_RECORDS, BATCH_SIZE, topicIdPartitions);
validateShareFetchFutureException(future, tp0,
Errors.UNKNOWN_SERVER_ERROR, "Exception");
// Verify that the share partition is still in the cache on exception.
assertEquals(1, partitionCacheMap.size());
@@ -2658,7 +2658,7 @@ public class SharePartitionManagerTest {
doThrow(new NotLeaderOrFollowerException("Leader
exception")).when(mockReplicaManager).readFromLog(any(), any(),
any(ReplicaQuota.class), anyBoolean());
future = sharePartitionManager.fetchMessages(groupId,
memberId.toString(), FETCH_PARAMS, 0,
- BATCH_SIZE, topicIdPartitions);
+ MAX_FETCH_RECORDS, BATCH_SIZE, topicIdPartitions);
validateShareFetchFutureException(future, tp0,
Errors.NOT_LEADER_OR_FOLLOWER, "Leader exception");
assertTrue(partitionCacheMap.isEmpty());
// Should have 2 fetch recorded and 2 failures.
@@ -2711,7 +2711,7 @@ public class SharePartitionManagerTest {
CompletableFuture<Map<TopicIdPartition,
ShareFetchResponseData.PartitionData>> future =
sharePartitionManager.fetchMessages(groupId, memberId.toString(),
FETCH_PARAMS, 0,
- BATCH_SIZE, topicIdPartitions);
+ MAX_FETCH_RECORDS, BATCH_SIZE, topicIdPartitions);
validateShareFetchFutureException(future, tp0,
Errors.FENCED_STATE_EPOCH, "Fenced exception");
// Verify that tp1 is still in the cache on exception.
assertEquals(1, partitionCacheMap.size());
@@ -2726,7 +2726,7 @@ public class SharePartitionManagerTest {
doThrow(new FencedStateEpochException("Fenced exception
again")).when(mockReplicaManager).readFromLog(any(), any(),
any(ReplicaQuota.class), anyBoolean());
future = sharePartitionManager.fetchMessages(groupId,
memberId.toString(), FETCH_PARAMS, 0,
- BATCH_SIZE, topicIdPartitions);
+ MAX_FETCH_RECORDS, BATCH_SIZE, topicIdPartitions);
validateShareFetchFutureException(future, List.of(tp0, tp1),
Errors.FENCED_STATE_EPOCH, "Fenced exception again");
assertTrue(partitionCacheMap.isEmpty());
// Should have 4 fetch recorded (2 fetch and 2 topics) and 3 failures
as sp1 was not acquired
@@ -2758,7 +2758,7 @@ public class SharePartitionManagerTest {
.build();
CompletableFuture<Map<TopicIdPartition, PartitionData>> future =
sharePartitionManager.fetchMessages(
- groupId, memberId.toString(), FETCH_PARAMS, 0, BATCH_SIZE,
topicIdPartitions);
+ groupId, memberId.toString(), FETCH_PARAMS, 0, MAX_FETCH_RECORDS,
BATCH_SIZE, topicIdPartitions);
assertTrue(future.isDone());
// Validate that the listener is registered.
verify(mockReplicaManager, times(2)).maybeAddListener(any(), any());
@@ -2821,39 +2821,39 @@ public class SharePartitionManagerTest {
// Capture the arguments passed to processShareFetch.
ArgumentCaptor<ShareFetch> captor =
ArgumentCaptor.forClass(ShareFetch.class);
- sharePartitionManager.fetchMessages(groupId, memberId1.toString(),
FETCH_PARAMS, 0, BATCH_SIZE,
- topicIdPartitions);
+ sharePartitionManager.fetchMessages(groupId, memberId1.toString(),
FETCH_PARAMS, 0,
+ MAX_FETCH_RECORDS, BATCH_SIZE, topicIdPartitions);
verify(sharePartitionManager,
times(1)).processShareFetch(captor.capture());
// Verify the partitions rotation, no rotation.
ShareFetch resultShareFetch = captor.getValue();
validateRotatedListEquals(resultShareFetch.topicIdPartitions(),
topicIdPartitions, 0);
// Single rotation.
- sharePartitionManager.fetchMessages(groupId, memberId1.toString(),
FETCH_PARAMS, 1, BATCH_SIZE,
- topicIdPartitions);
+ sharePartitionManager.fetchMessages(groupId, memberId1.toString(),
FETCH_PARAMS, 1,
+ MAX_FETCH_RECORDS, BATCH_SIZE, topicIdPartitions);
verify(sharePartitionManager,
times(2)).processShareFetch(captor.capture());
// Verify the partitions rotation, rotate by 1.
resultShareFetch = captor.getValue();
validateRotatedListEquals(topicIdPartitions,
resultShareFetch.topicIdPartitions(), 1);
// Rotation by 3, less that the number of partitions.
- sharePartitionManager.fetchMessages(groupId, memberId1.toString(),
FETCH_PARAMS, 3, BATCH_SIZE,
- topicIdPartitions);
+ sharePartitionManager.fetchMessages(groupId, memberId1.toString(),
FETCH_PARAMS, 3,
+ MAX_FETCH_RECORDS, BATCH_SIZE, topicIdPartitions);
verify(sharePartitionManager,
times(3)).processShareFetch(captor.capture());
// Verify the partitions rotation, rotate by 3.
resultShareFetch = captor.getValue();
validateRotatedListEquals(topicIdPartitions,
resultShareFetch.topicIdPartitions(), 3);
// Rotation by 12, more than the number of partitions.
- sharePartitionManager.fetchMessages(groupId, memberId1.toString(),
FETCH_PARAMS, 12, BATCH_SIZE,
- topicIdPartitions);
+ sharePartitionManager.fetchMessages(groupId, memberId1.toString(),
FETCH_PARAMS, 12,
+ MAX_FETCH_RECORDS, BATCH_SIZE, topicIdPartitions);
verify(sharePartitionManager,
times(4)).processShareFetch(captor.capture());
// Verify the partitions rotation, rotate by 5 (12 % 7).
resultShareFetch = captor.getValue();
validateRotatedListEquals(topicIdPartitions,
resultShareFetch.topicIdPartitions(), 5);
// Rotation by Integer.MAX_VALUE, boundary test.
- sharePartitionManager.fetchMessages(groupId, memberId1.toString(),
FETCH_PARAMS, Integer.MAX_VALUE, BATCH_SIZE,
- topicIdPartitions);
+ sharePartitionManager.fetchMessages(groupId, memberId1.toString(),
FETCH_PARAMS, Integer.MAX_VALUE,
+ MAX_FETCH_RECORDS, BATCH_SIZE, topicIdPartitions);
verify(sharePartitionManager,
times(5)).processShareFetch(captor.capture());
// Verify the partitions rotation, rotate by 1 (2147483647 % 7).
resultShareFetch = captor.getValue();
@@ -3123,7 +3123,6 @@ public class SharePartitionManagerTest {
timer,
MAX_DELIVERY_COUNT,
MAX_IN_FLIGHT_MESSAGES,
- MAX_FETCH_RECORDS,
persister,
mock(GroupConfigManager.class),
shareGroupMetrics,
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 1abf4b6297b..463a46ac121 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -75,7 +75,7 @@ import
org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource
import org.apache.kafka.common.utils.{ImplicitLinkedHashCollection,
ProducerIdAndEpoch, SecurityUtils, Utils}
import
org.apache.kafka.coordinator.group.GroupConfig.{CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG,
CONSUMER_SESSION_TIMEOUT_MS_CONFIG, SHARE_AUTO_OFFSET_RESET_CONFIG,
SHARE_HEARTBEAT_INTERVAL_MS_CONFIG, SHARE_RECORD_LOCK_DURATION_MS_CONFIG,
SHARE_SESSION_TIMEOUT_MS_CONFIG, STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG,
STREAMS_NUM_STANDBY_REPLICAS_CONFIG, STREAMS_SESSION_TIMEOUT_MS_CONFIG}
import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig
-import org.apache.kafka.coordinator.group.{GroupConfig, GroupCoordinator,
GroupCoordinatorConfig}
+import org.apache.kafka.coordinator.group.{GroupConfig, GroupConfigManager,
GroupCoordinator, GroupCoordinatorConfig}
import org.apache.kafka.coordinator.group.streams.StreamsGroupHeartbeatResult
import org.apache.kafka.coordinator.share.{ShareCoordinator,
ShareCoordinatorTestConfig}
import org.apache.kafka.coordinator.transaction.TransactionLogConfig
@@ -144,6 +144,7 @@ class KafkaApisTest extends Logging {
private val fetchManager: FetchManager = mock(classOf[FetchManager])
private val sharePartitionManager: SharePartitionManager =
mock(classOf[SharePartitionManager])
private val clientMetricsManager: ClientMetricsManager =
mock(classOf[ClientMetricsManager])
+ private val groupConfigManager: GroupConfigManager =
mock(classOf[GroupConfigManager])
private val brokerTopicStats = new BrokerTopicStats
private val clusterId = "clusterId"
private val time = new MockTime
@@ -209,7 +210,8 @@ class KafkaApisTest extends Logging {
time = time,
tokenManager = null,
apiVersionManager = apiVersionManager,
- clientMetricsManager = clientMetricsManager)
+ clientMetricsManager = clientMetricsManager,
+ groupConfigManager = groupConfigManager)
}
private def setupFeatures(featureVersions: Seq[FeatureVersion]): Unit = {
@@ -3825,7 +3827,7 @@ class KafkaApisTest extends Logging {
val records = memoryRecords(10, 0)
- when(sharePartitionManager.fetchMessages(any(), any(), any(), anyInt(),
anyInt(), any())).thenReturn(
+ when(sharePartitionManager.fetchMessages(any(), any(), any(), anyInt(),
anyInt(), anyInt(), any())).thenReturn(
CompletableFuture.completedFuture(Map[TopicIdPartition,
ShareFetchResponseData.PartitionData](
new TopicIdPartition(topicId, new TopicPartition(topicName,
partitionIndex)) ->
new ShareFetchResponseData.PartitionData()
@@ -3894,7 +3896,7 @@ class KafkaApisTest extends Logging {
val records = memoryRecords(10, 0)
- when(sharePartitionManager.fetchMessages(any(), any(), any(), anyInt(),
anyInt(), any())).thenReturn(
+ when(sharePartitionManager.fetchMessages(any(), any(), any(), anyInt(),
anyInt(), anyInt(), any())).thenReturn(
CompletableFuture.completedFuture(Map[TopicIdPartition,
ShareFetchResponseData.PartitionData](
new TopicIdPartition(topicId, new TopicPartition(topicName,
partitionIndex)) ->
new ShareFetchResponseData.PartitionData()
@@ -3997,7 +3999,7 @@ class KafkaApisTest extends Logging {
val records = memoryRecords(10, 0)
- when(sharePartitionManager.fetchMessages(any(), any(), any(), anyInt(),
anyInt(), any())).thenReturn(
+ when(sharePartitionManager.fetchMessages(any(), any(), any(), anyInt(),
anyInt(), anyInt(), any())).thenReturn(
CompletableFuture.completedFuture(Map[TopicIdPartition,
ShareFetchResponseData.PartitionData](
new TopicIdPartition(topicId, new TopicPartition(topicName,
partitionIndex)) ->
new ShareFetchResponseData.PartitionData()
@@ -4098,7 +4100,7 @@ class KafkaApisTest extends Logging {
addTopicToMetadataCache(topicName, 1, topicId = topicId)
val memberId: Uuid = Uuid.ZERO_UUID
- when(sharePartitionManager.fetchMessages(any(), any(), any(), anyInt(),
anyInt(), any())).thenReturn(
+ when(sharePartitionManager.fetchMessages(any(), any(), any(), anyInt(),
anyInt(), anyInt(), any())).thenReturn(
FutureUtils.failedFuture[util.Map[TopicIdPartition,
ShareFetchResponseData.PartitionData]](Errors.UNKNOWN_SERVER_ERROR.exception())
)
@@ -4148,7 +4150,7 @@ class KafkaApisTest extends Logging {
val records = memoryRecords(10, 0)
- when(sharePartitionManager.fetchMessages(any(), any(), any(), anyInt(),
anyInt(), any())).thenReturn(
+ when(sharePartitionManager.fetchMessages(any(), any(), any(), anyInt(),
anyInt(), anyInt(), any())).thenReturn(
CompletableFuture.completedFuture(Map[TopicIdPartition,
ShareFetchResponseData.PartitionData](
new TopicIdPartition(topicId, new TopicPartition(topicName,
partitionIndex)) ->
new ShareFetchResponseData.PartitionData()
@@ -4222,7 +4224,7 @@ class KafkaApisTest extends Logging {
val groupId = "group"
- when(sharePartitionManager.fetchMessages(any(), any(), any(), anyInt(),
anyInt(), any())).thenReturn(
+ when(sharePartitionManager.fetchMessages(any(), any(), any(), anyInt(),
anyInt(), anyInt(), any())).thenReturn(
FutureUtils.failedFuture[util.Map[TopicIdPartition,
ShareFetchResponseData.PartitionData]](Errors.UNKNOWN_SERVER_ERROR.exception())
)
@@ -4285,7 +4287,7 @@ class KafkaApisTest extends Logging {
val records = MemoryRecords.EMPTY
- when(sharePartitionManager.fetchMessages(any(), any(), any(), anyInt(),
anyInt(), any())).thenReturn(
+ when(sharePartitionManager.fetchMessages(any(), any(), any(), anyInt(),
anyInt(), anyInt(), any())).thenReturn(
CompletableFuture.completedFuture(Map[TopicIdPartition,
ShareFetchResponseData.PartitionData](
new TopicIdPartition(topicId, new TopicPartition(topicName,
partitionIndex)) ->
new ShareFetchResponseData.PartitionData()
@@ -4348,7 +4350,7 @@ class KafkaApisTest extends Logging {
val groupId = "group"
val records = memoryRecords(10, 0)
- when(sharePartitionManager.fetchMessages(any(), any(), any(), anyInt(),
anyInt(), any())).thenReturn(
+ when(sharePartitionManager.fetchMessages(any(), any(), any(), anyInt(),
anyInt(), anyInt(), any())).thenReturn(
CompletableFuture.completedFuture(Map[TopicIdPartition,
ShareFetchResponseData.PartitionData](
new TopicIdPartition(topicId, partitionIndex, topicName) ->
new ShareFetchResponseData.PartitionData()
@@ -4438,7 +4440,7 @@ class KafkaApisTest extends Logging {
val groupId = "group"
val records = memoryRecords(10, 0)
- when(sharePartitionManager.fetchMessages(any(), any(), any(), anyInt(),
anyInt(), any())).thenReturn(
+ when(sharePartitionManager.fetchMessages(any(), any(), any(), anyInt(),
anyInt(), anyInt(), any())).thenReturn(
CompletableFuture.completedFuture(Map[TopicIdPartition,
ShareFetchResponseData.PartitionData](
new TopicIdPartition(topicId, new TopicPartition(topicName,
partitionIndex)) ->
new ShareFetchResponseData.PartitionData()
@@ -4528,7 +4530,7 @@ class KafkaApisTest extends Logging {
val records2 = memoryRecords(10, 10)
val records3 = memoryRecords(10, 20)
- when(sharePartitionManager.fetchMessages(any(), any(), any(), anyInt(),
anyInt(), any())).thenReturn(
+ when(sharePartitionManager.fetchMessages(any(), any(), any(), anyInt(),
anyInt(), anyInt(), any())).thenReturn(
CompletableFuture.completedFuture(Map[TopicIdPartition,
ShareFetchResponseData.PartitionData](
new TopicIdPartition(topicId, new TopicPartition(topicName,
partitionIndex)) ->
new ShareFetchResponseData.PartitionData()
@@ -4752,7 +4754,7 @@ class KafkaApisTest extends Logging {
val groupId = "group"
- when(sharePartitionManager.fetchMessages(any(), any(), any(), anyInt(),
anyInt(), any())).thenReturn(
+ when(sharePartitionManager.fetchMessages(any(), any(), any(), anyInt(),
anyInt(), anyInt(), any())).thenReturn(
CompletableFuture.completedFuture(Map[TopicIdPartition,
ShareFetchResponseData.PartitionData](
new TopicIdPartition(topicId1, new TopicPartition(topicName1, 0)) ->
new ShareFetchResponseData.PartitionData()
@@ -5211,7 +5213,7 @@ class KafkaApisTest extends Logging {
val tp2 = new TopicIdPartition(topicId2, new TopicPartition(topicName2, 0))
val tp3 = new TopicIdPartition(topicId2, new TopicPartition(topicName2, 1))
- when(sharePartitionManager.fetchMessages(any(), any(), any(), anyInt(),
anyInt(), any())).thenReturn(
+ when(sharePartitionManager.fetchMessages(any(), any(), any(), anyInt(),
anyInt(), anyInt(), any())).thenReturn(
CompletableFuture.completedFuture(Map[TopicIdPartition,
ShareFetchResponseData.PartitionData](
tp1 ->
new ShareFetchResponseData.PartitionData()
@@ -5361,7 +5363,7 @@ class KafkaApisTest extends Logging {
val tp2 = new TopicIdPartition(topicId1, new TopicPartition(topicName1, 1))
val tp3 = new TopicIdPartition(topicId2, new TopicPartition(topicName2, 0))
- when(sharePartitionManager.fetchMessages(any(), any(), any(), anyInt(),
anyInt(), any())).thenReturn(
+ when(sharePartitionManager.fetchMessages(any(), any(), any(), anyInt(),
anyInt(), anyInt(), any())).thenReturn(
CompletableFuture.completedFuture(Map[TopicIdPartition,
ShareFetchResponseData.PartitionData](
tp1 ->
new ShareFetchResponseData.PartitionData()
@@ -5495,7 +5497,7 @@ class KafkaApisTest extends Logging {
val tp2 = new TopicIdPartition(topicId2, new TopicPartition(topicName2, 0))
val tp3 = new TopicIdPartition(topicId2, new TopicPartition(topicName2, 1))
- when(sharePartitionManager.fetchMessages(any(), any(), any(), anyInt(),
anyInt(), any())).thenReturn(
+ when(sharePartitionManager.fetchMessages(any(), any(), any(), anyInt(),
anyInt(), anyInt(), any())).thenReturn(
CompletableFuture.completedFuture(Map[TopicIdPartition,
ShareFetchResponseData.PartitionData](
tp1 ->
new ShareFetchResponseData.PartitionData()
@@ -5635,7 +5637,7 @@ class KafkaApisTest extends Logging {
val tp3 = new TopicIdPartition(topicId2, new TopicPartition(topicName2, 1))
val tp4 = new TopicIdPartition(topicId3, new TopicPartition(topicName3, 0))
- when(sharePartitionManager.fetchMessages(any(), any(), any(), anyInt(),
anyInt(), any())).thenReturn(
+ when(sharePartitionManager.fetchMessages(any(), any(), any(), anyInt(),
anyInt(), anyInt(), any())).thenReturn(
CompletableFuture.completedFuture(Map[TopicIdPartition,
ShareFetchResponseData.PartitionData](
tp2 ->
new ShareFetchResponseData.PartitionData()
@@ -5808,7 +5810,7 @@ class KafkaApisTest extends Logging {
val groupId = "group"
- when(sharePartitionManager.fetchMessages(any(), any(), any(), anyInt(),
anyInt(), any())).thenReturn(
+ when(sharePartitionManager.fetchMessages(any(), any(), any(), anyInt(),
anyInt(), anyInt(), any())).thenReturn(
CompletableFuture.completedFuture(Map[TopicIdPartition,
ShareFetchResponseData.PartitionData](
new TopicIdPartition(topicId, new TopicPartition(topicName,
partitionIndex)) ->
new ShareFetchResponseData.PartitionData()
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index 88cbe22769f..1f6c3755131 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -1031,7 +1031,6 @@ class KafkaConfigTest {
case GroupCoordinatorConfig.SHARE_GROUP_MAX_SIZE_CONFIG =>
assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
case
ShareGroupConfig.SHARE_FETCH_PURGATORY_PURGE_INTERVAL_REQUESTS_CONFIG =>
assertPropertyInvalid(baseProperties, name, "not_a_number")
case ShareGroupConfig.SHARE_GROUP_PERSISTER_CLASS_NAME_CONFIG =>
//ignore string
- case ShareGroupConfig.SHARE_FETCH_MAX_FETCH_RECORDS_CONFIG =>
assertPropertyInvalid(baseProperties, name, "not_a_number")
/** Streams groups configs */
case GroupCoordinatorConfig.STREAMS_GROUP_SESSION_TIMEOUT_MS_CONFIG =>
assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
diff --git
a/core/src/test/scala/unit/kafka/server/ShareFetchAcknowledgeRequestTest.scala
b/core/src/test/scala/unit/kafka/server/ShareFetchAcknowledgeRequestTest.scala
index 68fdb40fb22..9fc3165bd7b 100644
---
a/core/src/test/scala/unit/kafka/server/ShareFetchAcknowledgeRequestTest.scala
+++
b/core/src/test/scala/unit/kafka/server/ShareFetchAcknowledgeRequestTest.scala
@@ -60,7 +60,8 @@ class ShareFetchAcknowledgeRequestTest(cluster:
ClusterInstance) extends GroupCo
val shareFetchRequest = createShareFetchRequest(groupId, metadata, send,
Seq.empty, Map.empty)
val shareFetchResponse =
connectAndReceive[ShareFetchResponse](shareFetchRequest)
- assertEquals(Errors.UNSUPPORTED_VERSION.code(),
shareFetchResponse.data().errorCode())
+ assertEquals(Errors.UNSUPPORTED_VERSION.code,
shareFetchResponse.data.errorCode)
+ assertEquals(0, shareFetchResponse.data.acquisitionLockTimeoutMs)
}
@ClusterTest(
@@ -75,7 +76,7 @@ class ShareFetchAcknowledgeRequestTest(cluster:
ClusterInstance) extends GroupCo
val shareAcknowledgeRequest = createShareAcknowledgeRequest(groupId,
metadata, Map.empty)
val shareAcknowledgeResponse =
connectAndReceive[ShareAcknowledgeResponse](shareAcknowledgeRequest)
- assertEquals(Errors.UNSUPPORTED_VERSION.code(),
shareAcknowledgeResponse.data().errorCode())
+ assertEquals(Errors.UNSUPPORTED_VERSION.code,
shareAcknowledgeResponse.data.errorCode)
}
@ClusterTests(
@@ -127,6 +128,8 @@ class ShareFetchAcknowledgeRequestTest(cluster:
ClusterInstance) extends GroupCo
// Send the share fetch request to the non-replica and verify the error
code
val shareFetchRequest = createShareFetchRequest(groupId, metadata, send,
Seq.empty, Map.empty)
val shareFetchResponse =
connectAndReceive[ShareFetchResponse](shareFetchRequest, nonReplicaId)
+ assertEquals(30000, shareFetchResponse.data.acquisitionLockTimeoutMs)
+
val partitionData =
shareFetchResponse.responseData(topicNames).get(topicIdPartition)
assertEquals(Errors.NOT_LEADER_OR_FOLLOWER.code, partitionData.errorCode)
assertEquals(leader, partitionData.currentLeader().leaderId())
@@ -184,6 +187,7 @@ class ShareFetchAcknowledgeRequestTest(cluster:
ClusterInstance) extends GroupCo
val shareFetchResponseData = shareFetchResponse.data()
assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
+ assertEquals(30000, shareFetchResponseData.acquisitionLockTimeoutMs)
assertEquals(1, shareFetchResponseData.responses().size())
assertEquals(topicId, shareFetchResponseData.responses().get(0).topicId())
assertEquals(1,
shareFetchResponseData.responses().get(0).partitions().size())
@@ -258,6 +262,7 @@ class ShareFetchAcknowledgeRequestTest(cluster:
ClusterInstance) extends GroupCo
val shareFetchResponse =
connectAndReceive[ShareFetchResponse](shareFetchRequest)
val shareFetchResponseData = shareFetchResponse.data()
assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
+ assertEquals(30000, shareFetchResponseData.acquisitionLockTimeoutMs)
assertEquals(1, shareFetchResponseData.responses().size())
val partitionsCount =
shareFetchResponseData.responses().get(0).partitions().size()
if (partitionsCount > 0) {
@@ -376,6 +381,7 @@ class ShareFetchAcknowledgeRequestTest(cluster:
ClusterInstance) extends GroupCo
val shareFetchResponseData1 = shareFetchResponse1.data()
assertEquals(Errors.NONE.code, shareFetchResponseData1.errorCode)
+ assertEquals(30000, shareFetchResponseData1.acquisitionLockTimeoutMs)
assertEquals(1, shareFetchResponseData1.responses().size())
assertEquals(topicId, shareFetchResponseData1.responses().get(0).topicId())
assertEquals(1,
shareFetchResponseData1.responses().get(0).partitions().size())
@@ -383,6 +389,7 @@ class ShareFetchAcknowledgeRequestTest(cluster:
ClusterInstance) extends GroupCo
val shareFetchResponseData2 = shareFetchResponse2.data()
assertEquals(Errors.NONE.code, shareFetchResponseData2.errorCode)
+ assertEquals(30000, shareFetchResponseData2.acquisitionLockTimeoutMs)
assertEquals(1, shareFetchResponseData2.responses().size())
assertEquals(topicId, shareFetchResponseData2.responses().get(0).topicId())
assertEquals(1,
shareFetchResponseData2.responses().get(0).partitions().size())
@@ -472,6 +479,7 @@ class ShareFetchAcknowledgeRequestTest(cluster:
ClusterInstance) extends GroupCo
var shareFetchResponseData = shareFetchResponse.data()
assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
+ assertEquals(30000, shareFetchResponseData.acquisitionLockTimeoutMs)
assertEquals(1, shareFetchResponseData.responses().size())
assertEquals(topicId, shareFetchResponseData.responses().get(0).topicId())
assertEquals(1,
shareFetchResponseData.responses().get(0).partitions().size())
@@ -520,6 +528,7 @@ class ShareFetchAcknowledgeRequestTest(cluster:
ClusterInstance) extends GroupCo
shareFetchResponseData = shareFetchResponse.data()
assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
+ assertEquals(30000, shareFetchResponseData.acquisitionLockTimeoutMs)
assertEquals(1, shareFetchResponseData.responses().size())
assertEquals(topicId, shareFetchResponseData.responses().get(0).topicId())
assertEquals(1,
shareFetchResponseData.responses().get(0).partitions().size())
@@ -541,7 +550,8 @@ class ShareFetchAcknowledgeRequestTest(cluster:
ClusterInstance) extends GroupCo
new ClusterConfigProperty(key =
"group.coordinator.rebalance.protocols", value = "classic,consumer,share"),
new ClusterConfigProperty(key = "group.share.enable", value =
"true"),
new ClusterConfigProperty(key = "offsets.topic.num.partitions",
value = "1"),
- new ClusterConfigProperty(key = "offsets.topic.replication.factor",
value = "1")
+ new ClusterConfigProperty(key = "offsets.topic.replication.factor",
value = "1"),
+ new ClusterConfigProperty(key =
"group.share.record.lock.duration.ms", value = "15000")
)
),
new ClusterTest(
@@ -553,7 +563,8 @@ class ShareFetchAcknowledgeRequestTest(cluster:
ClusterInstance) extends GroupCo
new ClusterConfigProperty(key = "group.share.persister.class.name",
value = "org.apache.kafka.server.share.persister.DefaultStatePersister"),
new ClusterConfigProperty(key =
"share.coordinator.state.topic.replication.factor", value = "1"),
new ClusterConfigProperty(key =
"share.coordinator.state.topic.num.partitions", value = "1"),
- new ClusterConfigProperty(key = "unstable.api.versions.enable",
value = "true")
+ new ClusterConfigProperty(key = "unstable.api.versions.enable",
value = "true"),
+ new ClusterConfigProperty(key =
"group.share.record.lock.duration.ms", value = "15000")
)
),
)
@@ -573,7 +584,7 @@ class ShareFetchAcknowledgeRequestTest(cluster:
ClusterInstance) extends GroupCo
val send: Seq[TopicIdPartition] = Seq(topicIdPartition)
// Send the first share fetch request to initialize the share partition
- sendFirstShareFetchRequest(memberId, groupId, send)
+ sendFirstShareFetchRequest(memberId, groupId, send, 15000)
initProducer()
// Producing 10 records to the topic created above
@@ -588,6 +599,7 @@ class ShareFetchAcknowledgeRequestTest(cluster:
ClusterInstance) extends GroupCo
var shareFetchResponseData = shareFetchResponse.data()
assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
+ assertEquals(15000, shareFetchResponseData.acquisitionLockTimeoutMs)
assertEquals(1, shareFetchResponseData.responses().size())
assertEquals(topicId, shareFetchResponseData.responses().get(0).topicId())
assertEquals(1,
shareFetchResponseData.responses().get(0).partitions().size())
@@ -616,6 +628,7 @@ class ShareFetchAcknowledgeRequestTest(cluster:
ClusterInstance) extends GroupCo
shareFetchResponseData = shareFetchResponse.data()
assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
+ assertEquals(15000, shareFetchResponseData.acquisitionLockTimeoutMs)
assertEquals(1, shareFetchResponseData.responses().size())
assertEquals(topicId, shareFetchResponseData.responses().get(0).topicId())
assertEquals(1,
shareFetchResponseData.responses().get(0).partitions().size())
@@ -640,6 +653,7 @@ class ShareFetchAcknowledgeRequestTest(cluster:
ClusterInstance) extends GroupCo
shareFetchResponseData = shareFetchResponse.data()
assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
+ assertEquals(15000, shareFetchResponseData.acquisitionLockTimeoutMs)
assertEquals(1, shareFetchResponseData.responses().size())
assertEquals(topicId, shareFetchResponseData.responses().get(0).topicId())
assertEquals(1,
shareFetchResponseData.responses().get(0).partitions().size())
@@ -708,6 +722,7 @@ class ShareFetchAcknowledgeRequestTest(cluster:
ClusterInstance) extends GroupCo
var shareFetchResponseData = shareFetchResponse.data()
assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
+ assertEquals(30000, shareFetchResponseData.acquisitionLockTimeoutMs)
assertEquals(1, shareFetchResponseData.responses().size())
assertEquals(topicId, shareFetchResponseData.responses().get(0).topicId())
assertEquals(1,
shareFetchResponseData.responses().get(0).partitions().size())
@@ -753,6 +768,7 @@ class ShareFetchAcknowledgeRequestTest(cluster:
ClusterInstance) extends GroupCo
shareFetchResponseData = shareFetchResponse.data()
assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
+ assertEquals(30000, shareFetchResponseData.acquisitionLockTimeoutMs)
assertEquals(1, shareFetchResponseData.responses().size())
assertEquals(topicId, shareFetchResponseData.responses().get(0).topicId())
assertEquals(1,
shareFetchResponseData.responses().get(0).partitions().size())
@@ -821,6 +837,7 @@ class ShareFetchAcknowledgeRequestTest(cluster:
ClusterInstance) extends GroupCo
var shareFetchResponseData = shareFetchResponse.data()
assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
+ assertEquals(30000, shareFetchResponseData.acquisitionLockTimeoutMs)
assertEquals(1, shareFetchResponseData.responses().size())
assertEquals(topicId, shareFetchResponseData.responses().get(0).topicId())
assertEquals(1,
shareFetchResponseData.responses().get(0).partitions().size())
@@ -865,6 +882,7 @@ class ShareFetchAcknowledgeRequestTest(cluster:
ClusterInstance) extends GroupCo
shareFetchResponseData = shareFetchResponse.data()
assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
+ assertEquals(30000, shareFetchResponseData.acquisitionLockTimeoutMs)
assertEquals(1, shareFetchResponseData.responses().size())
assertEquals(topicId,
shareFetchResponseData.responses().get(0).topicId())
val responseSize =
shareFetchResponseData.responses().get(0).partitions().size()
@@ -938,6 +956,7 @@ class ShareFetchAcknowledgeRequestTest(cluster:
ClusterInstance) extends GroupCo
var shareFetchResponseData = shareFetchResponse.data()
assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
+ assertEquals(30000, shareFetchResponseData.acquisitionLockTimeoutMs)
assertEquals(1, shareFetchResponseData.responses().size())
assertEquals(topicId, shareFetchResponseData.responses().get(0).topicId())
assertEquals(1,
shareFetchResponseData.responses().get(0).partitions().size())
@@ -986,6 +1005,7 @@ class ShareFetchAcknowledgeRequestTest(cluster:
ClusterInstance) extends GroupCo
shareFetchResponseData = shareFetchResponse.data()
assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
+ assertEquals(30000, shareFetchResponseData.acquisitionLockTimeoutMs)
assertEquals(1, shareFetchResponseData.responses().size())
assertEquals(topicId, shareFetchResponseData.responses().get(0).topicId())
assertEquals(1,
shareFetchResponseData.responses().get(0).partitions().size())
@@ -1054,6 +1074,7 @@ class ShareFetchAcknowledgeRequestTest(cluster:
ClusterInstance) extends GroupCo
var shareFetchResponseData = shareFetchResponse.data()
assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
+ assertEquals(30000, shareFetchResponseData.acquisitionLockTimeoutMs)
assertEquals(1, shareFetchResponseData.responses().size())
assertEquals(topicId, shareFetchResponseData.responses().get(0).topicId())
assertEquals(1,
shareFetchResponseData.responses().get(0).partitions().size())
@@ -1082,6 +1103,7 @@ class ShareFetchAcknowledgeRequestTest(cluster:
ClusterInstance) extends GroupCo
shareFetchResponseData = shareFetchResponse.data()
assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
+ assertEquals(30000, shareFetchResponseData.acquisitionLockTimeoutMs)
assertEquals(1, shareFetchResponseData.responses().size())
assertEquals(topicId, shareFetchResponseData.responses().get(0).topicId())
assertEquals(1,
shareFetchResponseData.responses().get(0).partitions().size())
@@ -1106,6 +1128,7 @@ class ShareFetchAcknowledgeRequestTest(cluster:
ClusterInstance) extends GroupCo
shareFetchResponseData = shareFetchResponse.data()
assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
+ assertEquals(30000, shareFetchResponseData.acquisitionLockTimeoutMs)
assertEquals(1, shareFetchResponseData.responses().size())
assertEquals(topicId, shareFetchResponseData.responses().get(0).topicId())
assertEquals(1,
shareFetchResponseData.responses().get(0).partitions().size())
@@ -1176,6 +1199,7 @@ class ShareFetchAcknowledgeRequestTest(cluster:
ClusterInstance) extends GroupCo
var shareFetchResponseData = shareFetchResponse.data()
assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
+ assertEquals(30000, shareFetchResponseData.acquisitionLockTimeoutMs)
assertEquals(1, shareFetchResponseData.responses().size())
assertEquals(topicId, shareFetchResponseData.responses().get(0).topicId())
assertEquals(1,
shareFetchResponseData.responses().get(0).partitions().size())
@@ -1221,6 +1245,7 @@ class ShareFetchAcknowledgeRequestTest(cluster:
ClusterInstance) extends GroupCo
shareFetchResponseData = shareFetchResponse.data()
assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
+ assertEquals(30000, shareFetchResponseData.acquisitionLockTimeoutMs)
assertEquals(1, shareFetchResponseData.responses().size())
assertEquals(topicId, shareFetchResponseData.responses().get(0).topicId())
assertEquals(1,
shareFetchResponseData.responses().get(0).partitions().size())
@@ -1268,6 +1293,7 @@ class ShareFetchAcknowledgeRequestTest(cluster:
ClusterInstance) extends GroupCo
shareFetchResponseData = shareFetchResponse.data()
assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
+ assertEquals(30000, shareFetchResponseData.acquisitionLockTimeoutMs)
assertEquals(1, shareFetchResponseData.responses().size())
assertEquals(topicId, shareFetchResponseData.responses().get(0).topicId())
assertEquals(1,
shareFetchResponseData.responses().get(0).partitions().size())
@@ -1534,6 +1560,7 @@ class ShareFetchAcknowledgeRequestTest(cluster:
ClusterInstance) extends GroupCo
var shareFetchResponseData = shareFetchResponse.data()
assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
+ assertEquals(30000, shareFetchResponseData.acquisitionLockTimeoutMs)
assertEquals(1, shareFetchResponseData.responses().size())
assertEquals(topicId, shareFetchResponseData.responses().get(0).topicId())
assertEquals(1,
shareFetchResponseData.responses().get(0).partitions().size())
@@ -1562,6 +1589,7 @@ class ShareFetchAcknowledgeRequestTest(cluster:
ClusterInstance) extends GroupCo
shareFetchResponseData = shareFetchResponse.data()
assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
+ assertEquals(30000, shareFetchResponseData.acquisitionLockTimeoutMs)
assertEquals(1, shareFetchResponseData.responses().size())
assertEquals(topicId, shareFetchResponseData.responses().get(0).topicId())
assertEquals(1,
shareFetchResponseData.responses().get(0).partitions().size())
@@ -1587,6 +1615,7 @@ class ShareFetchAcknowledgeRequestTest(cluster:
ClusterInstance) extends GroupCo
shareFetchResponseData = shareFetchResponse.data()
assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
+ assertEquals(30000, shareFetchResponseData.acquisitionLockTimeoutMs)
assertEquals(0, shareFetchResponseData.responses().size()) // responses
list will be empty because there are no responses for the final fetch request
}
@@ -1644,6 +1673,7 @@ class ShareFetchAcknowledgeRequestTest(cluster:
ClusterInstance) extends GroupCo
var shareFetchResponseData = shareFetchResponse.data()
assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
+ assertEquals(30000, shareFetchResponseData.acquisitionLockTimeoutMs)
assertEquals(1, shareFetchResponseData.responses().size())
assertEquals(topicId, shareFetchResponseData.responses().get(0).topicId())
assertEquals(1,
shareFetchResponseData.responses().get(0).partitions().size())
@@ -1672,6 +1702,7 @@ class ShareFetchAcknowledgeRequestTest(cluster:
ClusterInstance) extends GroupCo
shareFetchResponseData = shareFetchResponse.data()
assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
+ assertEquals(30000, shareFetchResponseData.acquisitionLockTimeoutMs)
assertEquals(1, shareFetchResponseData.responses().size())
assertEquals(topicId, shareFetchResponseData.responses().get(0).topicId())
assertEquals(1,
shareFetchResponseData.responses().get(0).partitions().size())
@@ -1764,6 +1795,7 @@ class ShareFetchAcknowledgeRequestTest(cluster:
ClusterInstance) extends GroupCo
val shareFetchResponseData = shareFetchResponse.data()
// The response will have a top level error code because this is an
Initial Fetch request with acknowledgement data present
assertEquals(Errors.INVALID_REQUEST.code(),
shareFetchResponseData.errorCode)
+ assertEquals(0, shareFetchResponse.data.acquisitionLockTimeoutMs)
}
@ClusterTests(
@@ -1870,6 +1902,7 @@ class ShareFetchAcknowledgeRequestTest(cluster:
ClusterInstance) extends GroupCo
var shareFetchResponseData = shareFetchResponse.data()
assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
+ assertEquals(30000, shareFetchResponseData.acquisitionLockTimeoutMs)
assertEquals(1, shareFetchResponseData.responses().size())
assertEquals(topicId, shareFetchResponseData.responses().get(0).topicId())
assertEquals(1,
shareFetchResponseData.responses().get(0).partitions().size())
@@ -1946,6 +1979,7 @@ class ShareFetchAcknowledgeRequestTest(cluster:
ClusterInstance) extends GroupCo
val shareFetchResponseData = shareFetchResponse.data()
assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
+ assertEquals(30000, shareFetchResponseData.acquisitionLockTimeoutMs)
assertEquals(1, shareFetchResponseData.responses().size())
assertEquals(topicId, shareFetchResponseData.responses().get(0).topicId())
assertEquals(1,
shareFetchResponseData.responses().get(0).partitions().size())
@@ -2028,6 +2062,7 @@ class ShareFetchAcknowledgeRequestTest(cluster:
ClusterInstance) extends GroupCo
var shareFetchResponseData = shareFetchResponse.data()
assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
+ assertEquals(30000, shareFetchResponseData.acquisitionLockTimeoutMs)
assertEquals(1, shareFetchResponseData.responses().size())
assertEquals(topicId, shareFetchResponseData.responses().get(0).topicId())
assertEquals(1,
shareFetchResponseData.responses().get(0).partitions().size())
@@ -2105,6 +2140,7 @@ class ShareFetchAcknowledgeRequestTest(cluster:
ClusterInstance) extends GroupCo
val shareFetchResponseData = shareFetchResponse.data()
assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
+ assertEquals(30000, shareFetchResponseData.acquisitionLockTimeoutMs)
assertEquals(1, shareFetchResponseData.responses().size())
assertEquals(topicId, shareFetchResponseData.responses().get(0).topicId())
assertEquals(1,
shareFetchResponseData.responses().get(0).partitions().size())
@@ -2194,6 +2230,7 @@ class ShareFetchAcknowledgeRequestTest(cluster:
ClusterInstance) extends GroupCo
val shareFetchResponse =
connectAndReceive[ShareFetchResponse](shareFetchRequest)
val shareFetchResponseData = shareFetchResponse.data()
assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
+ assertEquals(30000, shareFetchResponseData.acquisitionLockTimeoutMs)
assertEquals(1, shareFetchResponseData.responses().size())
val partitionsCount =
shareFetchResponseData.responses().get(0).partitions().size()
if (partitionsCount > 0) {
@@ -2220,6 +2257,7 @@ class ShareFetchAcknowledgeRequestTest(cluster:
ClusterInstance) extends GroupCo
val shareFetchResponseData = shareFetchResponse.data()
assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
+ assertEquals(30000, shareFetchResponseData.acquisitionLockTimeoutMs)
assertEquals(1, shareFetchResponseData.responses().size())
assertEquals(topicId, shareFetchResponseData.responses().get(0).topicId())
assertEquals(1,
shareFetchResponseData.responses().get(0).partitions().size())
@@ -2234,9 +2272,143 @@ class ShareFetchAcknowledgeRequestTest(cluster:
ClusterInstance) extends GroupCo
compareFetchResponsePartitions(expectedPartitionData, partitionData)
}
+ @ClusterTests(
+ Array(
+ new ClusterTest(
+ serverProperties = Array(
+ new ClusterConfigProperty(key =
"group.coordinator.rebalance.protocols", value = "classic,consumer,share"),
+ new ClusterConfigProperty(key = "group.share.enable", value =
"true"),
+ new ClusterConfigProperty(key = "offsets.topic.num.partitions",
value = "1"),
+ new ClusterConfigProperty(key = "offsets.topic.replication.factor",
value = "1")
+ )
+ ),
+ new ClusterTest(
+ serverProperties = Array(
+ new ClusterConfigProperty(key =
"group.coordinator.rebalance.protocols", value = "classic,consumer,share"),
+ new ClusterConfigProperty(key = "group.share.enable", value =
"true"),
+ new ClusterConfigProperty(key = "offsets.topic.num.partitions",
value = "1"),
+ new ClusterConfigProperty(key = "offsets.topic.replication.factor",
value = "1"),
+ new ClusterConfigProperty(key = "group.share.persister.class.name",
value = "org.apache.kafka.server.share.persister.DefaultStatePersister"),
+ new ClusterConfigProperty(key =
"share.coordinator.state.topic.replication.factor", value = "1"),
+ new ClusterConfigProperty(key =
"share.coordinator.state.topic.num.partitions", value = "1"),
+ new ClusterConfigProperty(key = "unstable.api.versions.enable",
value = "true")
+ )
+ )
+ )
+ )
+ def testShareFetchRequestWithMaxRecordsAndBatchSize(): Unit = {
+ val groupId: String = "group"
+ val memberId = Uuid.randomUuid
+ val topic = "topic"
+ val partition = 0
+
+ createTopicAndReturnLeaders(topic, numPartitions = 3)
+ val topicIds = getTopicIds.asJava
+ val topicId = topicIds.get(topic)
+ val topicIdPartition = new TopicIdPartition(topicId, new
TopicPartition(topic, partition))
+
+ val send: Seq[TopicIdPartition] = Seq(topicIdPartition)
+
+ // Send the first share fetch request to initialize the share partition
+ sendFirstShareFetchRequest(memberId, groupId, send)
+
+ initProducer()
+ // Producing 10 records to the topic created above
+ produceData(topicIdPartition, 10)
+
+ // Send the second share fetch request to fetch the records produced above
+ val metadata = new ShareRequestMetadata(memberId,
ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH))
+ val acknowledgementsMap: Map[TopicIdPartition,
util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty
+ val shareFetchRequest = createShareFetchRequest(groupId, metadata, send,
Seq.empty, acknowledgementsMap, maxRecords = 1, batchSize = 1)
+ val shareFetchResponse =
connectAndReceive[ShareFetchResponse](shareFetchRequest)
+
+ val shareFetchResponseData = shareFetchResponse.data
+ assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
+ assertEquals(30000, shareFetchResponseData.acquisitionLockTimeoutMs)
+ assertEquals(1, shareFetchResponseData.responses.size)
+ assertEquals(topicId, shareFetchResponseData.responses.get(0).topicId)
+ assertEquals(1, shareFetchResponseData.responses.get(0).partitions.size)
+
+ val expectedPartitionData = new ShareFetchResponseData.PartitionData()
+ .setPartitionIndex(partition)
+ .setErrorCode(Errors.NONE.code)
+ .setAcknowledgeErrorCode(Errors.NONE.code)
+ .setAcquiredRecords(expectedAcquiredRecords(util.List.of(0),
util.List.of(0), util.List.of(1)))
+
+ val partitionData =
shareFetchResponseData.responses.get(0).partitions.get(0)
+ compareFetchResponsePartitions(expectedPartitionData, partitionData)
+ }
+
+ @ClusterTests(
+ Array(
+ new ClusterTest(
+ serverProperties = Array(
+ new ClusterConfigProperty(key =
"group.coordinator.rebalance.protocols", value = "classic,consumer,share"),
+ new ClusterConfigProperty(key = "group.share.enable", value =
"true"),
+ new ClusterConfigProperty(key = "offsets.topic.num.partitions",
value = "1"),
+ new ClusterConfigProperty(key = "offsets.topic.replication.factor",
value = "1")
+ )
+ ),
+ new ClusterTest(
+ serverProperties = Array(
+ new ClusterConfigProperty(key =
"group.coordinator.rebalance.protocols", value = "classic,consumer,share"),
+ new ClusterConfigProperty(key = "group.share.enable", value =
"true"),
+ new ClusterConfigProperty(key = "offsets.topic.num.partitions",
value = "1"),
+ new ClusterConfigProperty(key = "offsets.topic.replication.factor",
value = "1"),
+ new ClusterConfigProperty(key = "group.share.persister.class.name",
value = "org.apache.kafka.server.share.persister.DefaultStatePersister"),
+ new ClusterConfigProperty(key =
"share.coordinator.state.topic.replication.factor", value = "1"),
+ new ClusterConfigProperty(key =
"share.coordinator.state.topic.num.partitions", value = "1"),
+ new ClusterConfigProperty(key = "unstable.api.versions.enable",
value = "true")
+ )
+ )
+ )
+ )
+ def testShareFetchRequestMultipleBatchesWithMaxRecordsAndBatchSize(): Unit =
{
+ val groupId: String = "group"
+ val memberId = Uuid.randomUuid
+ val topic = "topic"
+ val partition = 0
+
+ createTopicAndReturnLeaders(topic, numPartitions = 3)
+ val topicIds = getTopicIds.asJava
+ val topicId = topicIds.get(topic)
+ val topicIdPartition = new TopicIdPartition(topicId, new
TopicPartition(topic, partition))
+
+ val send: Seq[TopicIdPartition] = Seq(topicIdPartition)
+
+ // Send the first share fetch request to initialize the share partition
+ sendFirstShareFetchRequest(memberId, groupId, send)
+
+ initProducer()
+ // Producing 10 records to the topic created above
+ produceData(topicIdPartition, 10)
+
+ // Send the second share fetch request to fetch the records produced above
+ val metadata = new ShareRequestMetadata(memberId,
ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH))
+ val acknowledgementsMap: Map[TopicIdPartition,
util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty
+ val shareFetchRequest = createShareFetchRequest(groupId, metadata, send,
Seq.empty, acknowledgementsMap, maxRecords = 5, batchSize = 1)
+ val shareFetchResponse =
connectAndReceive[ShareFetchResponse](shareFetchRequest)
+
+ val shareFetchResponseData = shareFetchResponse.data
+ assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
+ assertEquals(30000, shareFetchResponseData.acquisitionLockTimeoutMs)
+ assertEquals(1, shareFetchResponseData.responses.size)
+ assertEquals(topicId, shareFetchResponseData.responses.get(0).topicId)
+ assertEquals(1, shareFetchResponseData.responses.get(0).partitions.size)
+
+ val expectedPartitionData = new ShareFetchResponseData.PartitionData()
+ .setPartitionIndex(partition)
+ .setErrorCode(Errors.NONE.code)
+ .setAcknowledgeErrorCode(Errors.NONE.code)
+ .setAcquiredRecords(expectedAcquiredRecords(util.List.of(0, 1, 2, 3, 4),
util.List.of(0, 1, 2, 3, 4), util.List.of(1, 1, 1, 1, 1)))
+
+ val partitionData =
shareFetchResponseData.responses.get(0).partitions.get(0)
+ compareFetchResponsePartitions(expectedPartitionData, partitionData)
+ }
+
// For initial fetch request, the response may not be available in the first
attempt when the share
// partition is not initialized yet. Hence, wait for response from all
partitions before proceeding.
- private def sendFirstShareFetchRequest(memberId: Uuid, groupId: String,
topicIdPartitions: Seq[TopicIdPartition]): Unit = {
+ private def sendFirstShareFetchRequest(memberId: Uuid, groupId: String,
topicIdPartitions: Seq[TopicIdPartition], lockTimeout: Int = 30000): Unit = {
val partitions: util.Set[Integer] = new util.HashSet()
TestUtils.waitUntilTrue(() => {
val metadata = new ShareRequestMetadata(memberId,
ShareRequestMetadata.INITIAL_EPOCH)
@@ -2245,6 +2417,7 @@ class ShareFetchAcknowledgeRequestTest(cluster:
ClusterInstance) extends GroupCo
val shareFetchResponseData = shareFetchResponse.data()
assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
+ assertEquals(lockTimeout,
shareFetchResponseData.acquisitionLockTimeoutMs)
shareFetchResponseData.responses().asScala.foreach(response => {
if (!response.partitions().isEmpty) {
response.partitions().forEach(partitionData =>
partitions.add(partitionData.partitionIndex))
@@ -2270,7 +2443,7 @@ class ShareFetchAcknowledgeRequestTest(cluster:
ClusterInstance) extends GroupCo
actualResponse:
ShareFetchResponseData.PartitionData): Unit = {
assertEquals(expectedResponse.partitionIndex,
actualResponse.partitionIndex)
assertEquals(expectedResponse.errorCode, actualResponse.errorCode)
- assertEquals(expectedResponse.errorCode, actualResponse.errorCode)
+ assertEquals(expectedResponse.errorMessage, actualResponse.errorMessage)
assertEquals(expectedResponse.acknowledgeErrorCode,
actualResponse.acknowledgeErrorCode)
assertEquals(expectedResponse.acquiredRecords,
actualResponse.acquiredRecords)
}
@@ -2289,8 +2462,9 @@ class ShareFetchAcknowledgeRequestTest(cluster:
ClusterInstance) extends GroupCo
maxWaitMs: Int = MAX_WAIT_MS,
minBytes: Int = 0,
maxBytes: Int = Int.MaxValue,
+ maxRecords: Int = 500,
batchSize: Int = 500): ShareFetchRequest
= {
- ShareFetchRequest.Builder.forConsumer(groupId, metadata, maxWaitMs,
minBytes, maxBytes, batchSize, send.asJava, forget.asJava,
acknowledgementsMap.asJava)
+ ShareFetchRequest.Builder.forConsumer(groupId, metadata, maxWaitMs,
minBytes, maxBytes, maxRecords, batchSize, send.asJava, forget.asJava,
acknowledgementsMap.asJava)
.build()
}
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfig.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfig.java
index 4e34e15fee9..e216bd4037c 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfig.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfig.java
@@ -27,7 +27,6 @@ import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
-import static org.apache.kafka.common.config.ConfigDef.Importance.HIGH;
import static org.apache.kafka.common.config.ConfigDef.Importance.MEDIUM;
import static org.apache.kafka.common.config.ConfigDef.Range.between;
import static org.apache.kafka.common.config.ConfigDef.Type.BOOLEAN;
@@ -76,14 +75,8 @@ public class ShareGroupConfig {
public static final String SHARE_GROUP_PERSISTER_CLASS_NAME_DOC = "The
class name of share persister for share group. The class should implement " +
"the <code>org.apache.kafka.server.share.Persister</code> interface.";
- // Broker temporary configuration to limit the number of records fetched
by a share fetch request.
- public static final String SHARE_FETCH_MAX_FETCH_RECORDS_CONFIG =
"share.fetch.max.fetch.records";
- public static final int SHARE_FETCH_MAX_FETCH_RECORDS_DEFAULT =
Integer.MAX_VALUE;
- public static final String SHARE_FETCH_MAX_FETCH_RECORDS_DOC = "The
maximum number of records that can be fetched by a share fetch request.";
-
public static final ConfigDef CONFIG_DEF = new ConfigDef()
.defineInternal(SHARE_GROUP_ENABLE_CONFIG, BOOLEAN,
SHARE_GROUP_ENABLE_DEFAULT, null, MEDIUM, SHARE_GROUP_ENABLE_DOC)
- .defineInternal(SHARE_FETCH_MAX_FETCH_RECORDS_CONFIG, INT,
SHARE_FETCH_MAX_FETCH_RECORDS_DEFAULT, null, HIGH,
SHARE_FETCH_MAX_FETCH_RECORDS_DOC)
.define(SHARE_GROUP_DELIVERY_COUNT_LIMIT_CONFIG, INT,
SHARE_GROUP_DELIVERY_COUNT_LIMIT_DEFAULT, between(2, 10), MEDIUM,
SHARE_GROUP_DELIVERY_COUNT_LIMIT_DOC)
.define(SHARE_GROUP_RECORD_LOCK_DURATION_MS_CONFIG, INT,
SHARE_GROUP_RECORD_LOCK_DURATION_MS_DEFAULT, between(1000, 3600000), MEDIUM,
SHARE_GROUP_RECORD_LOCK_DURATION_MS_DOC)
.define(SHARE_GROUP_MIN_RECORD_LOCK_DURATION_MS_CONFIG, INT,
SHARE_GROUP_MIN_RECORD_LOCK_DURATION_MS_DEFAULT, between(1000, 30000), MEDIUM,
SHARE_GROUP_MIN_RECORD_LOCK_DURATION_MS_DOC)
@@ -102,7 +95,6 @@ public class ShareGroupConfig {
private final int shareGroupMinRecordLockDurationMs;
private final int shareFetchPurgatoryPurgeIntervalRequests;
private final String shareGroupPersisterClassName;
- private final int shareFetchMaxFetchRecords;
public ShareGroupConfig(AbstractConfig config) {
// Share groups are enabled in two cases: 1) The internal
configuration to enable it is
@@ -119,7 +111,6 @@ public class ShareGroupConfig {
shareGroupMinRecordLockDurationMs =
config.getInt(ShareGroupConfig.SHARE_GROUP_MIN_RECORD_LOCK_DURATION_MS_CONFIG);
shareFetchPurgatoryPurgeIntervalRequests =
config.getInt(ShareGroupConfig.SHARE_FETCH_PURGATORY_PURGE_INTERVAL_REQUESTS_CONFIG);
shareGroupPersisterClassName =
config.getString(ShareGroupConfig.SHARE_GROUP_PERSISTER_CLASS_NAME_CONFIG);
- shareFetchMaxFetchRecords =
config.getInt(ShareGroupConfig.SHARE_FETCH_MAX_FETCH_RECORDS_CONFIG);
validate();
}
@@ -160,10 +151,6 @@ public class ShareGroupConfig {
return shareGroupPersisterClassName;
}
- public int shareFetchMaxFetchRecords() {
- return shareFetchMaxFetchRecords;
- }
-
private void validate() {
Utils.require(shareGroupRecordLockDurationMs >=
shareGroupMinRecordLockDurationMs,
String.format("%s must be greater than or equal to %s",
diff --git
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/KRaftMetadataRequestBenchmark.java
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/KRaftMetadataRequestBenchmark.java
index 31d88091576..b4779a5fd3b 100644
---
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/KRaftMetadataRequestBenchmark.java
+++
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/KRaftMetadataRequestBenchmark.java
@@ -50,6 +50,7 @@ import org.apache.kafka.common.requests.RequestHeader;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.coordinator.group.GroupConfigManager;
import org.apache.kafka.coordinator.group.GroupCoordinator;
import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage;
@@ -119,6 +120,7 @@ public class KRaftMetadataRequestBenchmark {
private final FetchManager fetchManager = Mockito.mock(FetchManager.class);
private final SharePartitionManager sharePartitionManager =
Mockito.mock(SharePartitionManager.class);
private final ClientMetricsManager clientMetricsManager =
Mockito.mock(ClientMetricsManager.class);
+ private final GroupConfigManager groupConfigManager =
Mockito.mock(GroupConfigManager.class);
private final BrokerTopicStats brokerTopicStats = new
BrokerTopicStats(false);
private final KafkaPrincipal principal = new
KafkaPrincipal(KafkaPrincipal.USER_TYPE, "test-user");
@Param({"500", "1000", "5000"})
@@ -206,6 +208,7 @@ public class KRaftMetadataRequestBenchmark {
ApiMessageType.ListenerType.BROKER,
false,
() ->
FinalizedFeatures.fromKRaftVersion(MetadataVersion.latestTesting()))).
+ setGroupConfigManager(groupConfigManager).
build();
}
diff --git
a/server/src/main/java/org/apache/kafka/server/share/context/FinalContext.java
b/server/src/main/java/org/apache/kafka/server/share/context/FinalContext.java
index 1be37ae87b7..9abc5591ddd 100644
---
a/server/src/main/java/org/apache/kafka/server/share/context/FinalContext.java
+++
b/server/src/main/java/org/apache/kafka/server/share/context/FinalContext.java
@@ -55,7 +55,7 @@ public class FinalContext extends ShareFetchContext {
LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData> updates) {
log.debug("Final context returning {}",
partitionsToLogString(updates.keySet()));
return new
ShareFetchResponse(ShareFetchResponse.toMessage(Errors.NONE, 0,
- updates.entrySet().iterator(), List.of()));
+ updates.entrySet().iterator(), List.of(), 0));
}
@Override
diff --git
a/server/src/main/java/org/apache/kafka/server/share/context/ShareFetchContext.java
b/server/src/main/java/org/apache/kafka/server/share/context/ShareFetchContext.java
index 36062616cec..36bf67ad828 100644
---
a/server/src/main/java/org/apache/kafka/server/share/context/ShareFetchContext.java
+++
b/server/src/main/java/org/apache/kafka/server/share/context/ShareFetchContext.java
@@ -51,7 +51,7 @@ public abstract class ShareFetchContext {
*/
public ShareFetchResponse throttleResponse(int throttleTimeMs) {
return new
ShareFetchResponse(ShareFetchResponse.toMessage(Errors.NONE, throttleTimeMs,
- Collections.emptyIterator(), List.of()));
+ Collections.emptyIterator(), List.of(), 0));
}
/**
diff --git
a/server/src/main/java/org/apache/kafka/server/share/context/ShareSessionContext.java
b/server/src/main/java/org/apache/kafka/server/share/context/ShareSessionContext.java
index 99eb92a85b9..c3d177c8808 100644
---
a/server/src/main/java/org/apache/kafka/server/share/context/ShareSessionContext.java
+++
b/server/src/main/java/org/apache/kafka/server/share/context/ShareSessionContext.java
@@ -103,7 +103,7 @@ public class ShareSessionContext extends ShareFetchContext {
public ShareFetchResponse throttleResponse(int throttleTimeMs) {
if (!isSubsequent) {
return new
ShareFetchResponse(ShareFetchResponse.toMessage(Errors.NONE, throttleTimeMs,
- Collections.emptyIterator(), List.of()));
+ Collections.emptyIterator(), List.of(), 0));
}
int expectedEpoch =
ShareRequestMetadata.nextEpoch(reqMetadata.epoch());
int sessionEpoch;
@@ -114,10 +114,10 @@ public class ShareSessionContext extends
ShareFetchContext {
log.debug("Subsequent share session {} expected epoch {}, but got
{}. " +
"Possible duplicate request.", session.key(),
expectedEpoch, sessionEpoch);
return new
ShareFetchResponse(ShareFetchResponse.toMessage(Errors.INVALID_SHARE_SESSION_EPOCH,
- throttleTimeMs, Collections.emptyIterator(), List.of()));
+ throttleTimeMs, Collections.emptyIterator(), List.of(),
0));
}
return new
ShareFetchResponse(ShareFetchResponse.toMessage(Errors.NONE, throttleTimeMs,
- Collections.emptyIterator(), List.of()));
+ Collections.emptyIterator(), List.of(), 0));
}
/**
@@ -196,7 +196,7 @@ public class ShareSessionContext extends ShareFetchContext {
LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData> updates) {
if (!isSubsequent) {
return new ShareFetchResponse(ShareFetchResponse.toMessage(
- Errors.NONE, 0, updates.entrySet().iterator(), List.of()));
+ Errors.NONE, 0, updates.entrySet().iterator(), List.of(),
0));
} else {
int expectedEpoch =
ShareRequestMetadata.nextEpoch(reqMetadata.epoch());
int sessionEpoch;
@@ -207,7 +207,7 @@ public class ShareSessionContext extends ShareFetchContext {
log.debug("Subsequent share session {} expected epoch {}, but
got {}. Possible duplicate request.",
session.key(), expectedEpoch, sessionEpoch);
return new
ShareFetchResponse(ShareFetchResponse.toMessage(Errors.INVALID_SHARE_SESSION_EPOCH,
- 0, Collections.emptyIterator(), List.of()));
+ 0, Collections.emptyIterator(), List.of(), 0));
}
// Iterate over the update list using PartitionIterator. This will
prune updates which don't need to be sent
Iterator<Map.Entry<TopicIdPartition,
ShareFetchResponseData.PartitionData>> partitionIterator = new
PartitionIterator(
@@ -218,7 +218,7 @@ public class ShareSessionContext extends ShareFetchContext {
log.debug("Subsequent share session context with session key {}
returning {}", session.key(),
partitionsToLogString(updates.keySet()));
return new ShareFetchResponse(ShareFetchResponse.toMessage(
- Errors.NONE, 0, updates.entrySet().iterator(), List.of()));
+ Errors.NONE, 0, updates.entrySet().iterator(), List.of(),
0));
}
}