This is an automated email from the ASF dual-hosted git repository.

schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 4aa81204ff8 KAFKA-19018,KAFKA-19063: Implement maxRecords and 
acquisition lock timeout in share fetch request and response resp. (#19334)
4aa81204ff8 is described below

commit 4aa81204ff840bbd3e07f2b472ffbafc530225e9
Author: Apoorv Mittal <[email protected]>
AuthorDate: Tue Apr 1 12:23:06 2025 +0100

    KAFKA-19018,KAFKA-19063: Implement maxRecords and acquisition lock timeout 
in share fetch request and response resp. (#19334)
    
    PR add `MaxRecords` to share fetch request and also adds
    `AcquisitionLockTimeout` to share fetch response. PR also removes
    internal broker config of `max.fetch.records`.
    
    Reviewers: Andrew Schofield <[email protected]>
---
 .../consumer/internals/ShareSessionHandler.java    |   2 +-
 .../kafka/common/requests/ShareFetchRequest.java   |   5 +-
 .../kafka/common/requests/ShareFetchResponse.java  |   9 +-
 .../common/message/ShareFetchRequest.json          |   4 +-
 .../common/message/ShareFetchResponse.json         |   2 +
 .../internals/ShareConsumeRequestManagerTest.java  |  52 +++---
 .../kafka/server/builders/KafkaApisBuilder.java    |  11 +-
 .../java/kafka/server/share/ShareFetchUtils.java   |  17 ++
 .../java/kafka/server/share/SharePartition.java    |   8 +-
 .../kafka/server/share/SharePartitionManager.java  |  14 +-
 .../src/main/scala/kafka/server/BrokerServer.scala |   4 +-
 core/src/main/scala/kafka/server/KafkaApis.scala   |  11 +-
 .../server/share/SharePartitionManagerTest.java    |  81 +++++----
 .../scala/unit/kafka/server/KafkaApisTest.scala    |  38 +++--
 .../scala/unit/kafka/server/KafkaConfigTest.scala  |   1 -
 .../server/ShareFetchAcknowledgeRequestTest.scala  | 190 ++++++++++++++++++++-
 .../group/modern/share/ShareGroupConfig.java       |  13 --
 .../metadata/KRaftMetadataRequestBenchmark.java    |   3 +
 .../kafka/server/share/context/FinalContext.java   |   2 +-
 .../server/share/context/ShareFetchContext.java    |   2 +-
 .../server/share/context/ShareSessionContext.java  |  12 +-
 21 files changed, 334 insertions(+), 147 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareSessionHandler.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareSessionHandler.java
index e88318217ad..34a109944be 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareSessionHandler.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareSessionHandler.java
@@ -180,7 +180,7 @@ public class ShareSessionHandler {
         return ShareFetchRequest.Builder.forConsumer(
                 groupId, nextMetadata, fetchConfig.maxWaitMs,
                 fetchConfig.minBytes, fetchConfig.maxBytes, 
fetchConfig.maxPollRecords,
-                added, removed, acknowledgementBatches);
+                fetchConfig.maxPollRecords, added, removed, 
acknowledgementBatches);
     }
 
     public ShareAcknowledgeRequest.Builder newShareAcknowledgeBuilder(String 
groupId, FetchConfig fetchConfig) {
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/ShareFetchRequest.java 
b/clients/src/main/java/org/apache/kafka/common/requests/ShareFetchRequest.java
index 7dcf311bd7f..215fb6e90c7 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/ShareFetchRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/ShareFetchRequest.java
@@ -46,8 +46,8 @@ public class ShareFetchRequest extends AbstractRequest {
         }
 
         public static Builder forConsumer(String groupId, ShareRequestMetadata 
metadata,
-                                          int maxWait, int minBytes, int 
maxBytes, int batchSize,
-                                          List<TopicIdPartition> send, 
List<TopicIdPartition> forget,
+                                          int maxWait, int minBytes, int 
maxBytes, int maxRecords,
+                                          int batchSize, 
List<TopicIdPartition> send, List<TopicIdPartition> forget,
                                           Map<TopicIdPartition, 
List<ShareFetchRequestData.AcknowledgementBatch>> acknowledgementsMap) {
             ShareFetchRequestData data = new ShareFetchRequestData();
             data.setGroupId(groupId);
@@ -62,6 +62,7 @@ public class ShareFetchRequest extends AbstractRequest {
             data.setMaxWaitMs(maxWait);
             data.setMinBytes(minBytes);
             data.setMaxBytes(maxBytes);
+            data.setMaxRecords(maxRecords);
             data.setBatchSize(batchSize);
 
             // Build a map of topics to fetch keyed by topic ID, and within 
each a map of partitions keyed by index
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/ShareFetchResponse.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/ShareFetchResponse.java
index 619e740029d..0c40ea2039a 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/ShareFetchResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/ShareFetchResponse.java
@@ -144,7 +144,7 @@ public class ShareFetchResponse extends AbstractResponse {
                              Iterator<Map.Entry<TopicIdPartition, 
ShareFetchResponseData.PartitionData>> partIterator) {
         // Since the throttleTimeMs and metadata field sizes are constant and 
fixed, we can
         // use arbitrary values here without affecting the result.
-        ShareFetchResponseData data = toMessage(Errors.NONE, 0, partIterator, 
Collections.emptyList());
+        ShareFetchResponseData data = toMessage(Errors.NONE, 0, partIterator, 
Collections.emptyList(), 0);
         ObjectSerializationCache cache = new ObjectSerializationCache();
         return 4 + data.size(cache, version);
     }
@@ -159,13 +159,13 @@ public class ShareFetchResponse extends AbstractResponse {
     public static ShareFetchResponse of(Errors error,
                                         int throttleTimeMs,
                                         LinkedHashMap<TopicIdPartition, 
ShareFetchResponseData.PartitionData> responseData,
-                                        List<Node> nodeEndpoints) {
-        return new ShareFetchResponse(toMessage(error, throttleTimeMs, 
responseData.entrySet().iterator(), nodeEndpoints));
+                                        List<Node> nodeEndpoints, int 
acquisitionLockTimeout) {
+        return new ShareFetchResponse(toMessage(error, throttleTimeMs, 
responseData.entrySet().iterator(), nodeEndpoints, acquisitionLockTimeout));
     }
 
     public static ShareFetchResponseData toMessage(Errors error, int 
throttleTimeMs,
                                                    
Iterator<Map.Entry<TopicIdPartition, ShareFetchResponseData.PartitionData>> 
partIterator,
-                                                   List<Node> nodeEndpoints) {
+                                                   List<Node> nodeEndpoints, 
int acquisitionLockTimeout) {
         Map<Uuid, ShareFetchResponseData.ShareFetchableTopicResponse> 
topicResponseList = new LinkedHashMap<>();
         while (partIterator.hasNext()) {
             Map.Entry<TopicIdPartition, ShareFetchResponseData.PartitionData> 
entry = partIterator.next();
@@ -193,6 +193,7 @@ public class ShareFetchResponse extends AbstractResponse {
                         .setRack(endpoint.rack())));
         return data.setThrottleTimeMs(throttleTimeMs)
                 .setErrorCode(error.code())
+                .setAcquisitionLockTimeoutMs(acquisitionLockTimeout)
                 .setResponses(new ArrayList<>(topicResponseList.values()));
     }
 
diff --git a/clients/src/main/resources/common/message/ShareFetchRequest.json 
b/clients/src/main/resources/common/message/ShareFetchRequest.json
index e85fc095861..82ac3093db2 100644
--- a/clients/src/main/resources/common/message/ShareFetchRequest.json
+++ b/clients/src/main/resources/common/message/ShareFetchRequest.json
@@ -36,7 +36,9 @@
     { "name": "MinBytes", "type": "int32", "versions": "0+",
       "about": "The minimum bytes to accumulate in the response." },
     { "name": "MaxBytes", "type": "int32", "versions": "0+", "default": 
"0x7fffffff",
-      "about": "The maximum bytes to fetch.  See KIP-74 for cases where this 
limit may not be honored." },
+      "about": "The maximum bytes to fetch. See KIP-74 for cases where this 
limit may not be honored." },
+    { "name": "MaxRecords", "type": "int32", "versions": "0+",
+      "about": "The maximum number of records to fetch. This limit can be 
exceeded for alignment of batch boundaries." },
     { "name": "BatchSize", "type": "int32", "versions": "0+",
       "about": "The optimal number of records for batches of acquired records 
and acknowledgements." },
     { "name": "Topics", "type": "[]FetchTopic", "versions": "0+",
diff --git a/clients/src/main/resources/common/message/ShareFetchResponse.json 
b/clients/src/main/resources/common/message/ShareFetchResponse.json
index 858b0bdd46f..ed459f304af 100644
--- a/clients/src/main/resources/common/message/ShareFetchResponse.json
+++ b/clients/src/main/resources/common/message/ShareFetchResponse.json
@@ -39,6 +39,8 @@
       "about": "The top-level response error code." },
     { "name": "ErrorMessage", "type": "string", "versions": "0+", 
"nullableVersions": "0+", "default": "null",
       "about": "The top-level error message, or null if there was no error." },
+    { "name": "AcquisitionLockTimeoutMs", "type": "int32", "versions": "0+",
+      "about": "The time in milliseconds for which the acquired records are 
locked." },
     { "name": "Responses", "type": "[]ShareFetchableTopicResponse", 
"versions": "0+",
       "about": "The response topics.", "fields": [
       { "name": "TopicId", "type": "uuid", "versions": "0+",
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java
index 510a1318c59..4ad0bb43db3 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java
@@ -1340,7 +1340,7 @@ public class ShareConsumeRequestManagerTest {
         LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData> 
partitionDataMap = new LinkedHashMap<>();
         partitionDataMap.put(tip0, partitionDataForFetch(tip0, records, 
acquiredRecords, Errors.NONE, Errors.NONE));
         partitionDataMap.put(t2ip0, partitionDataForFetch(t2ip0, records, 
acquiredRecords, Errors.NONE, Errors.NONE));
-        client.prepareResponse(ShareFetchResponse.of(Errors.NONE, 0, 
partitionDataMap, Collections.emptyList()));
+        client.prepareResponse(ShareFetchResponse.of(Errors.NONE, 0, 
partitionDataMap, Collections.emptyList(), 0));
 
         networkClientDelegate.poll(time.timer(0));
         assertTrue(shareConsumeRequestManager.hasCompletedFetches());
@@ -1386,7 +1386,7 @@ public class ShareConsumeRequestManagerTest {
         LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData> 
partitionDataMap = new LinkedHashMap<>();
         partitionDataMap.put(tip0, partitionDataForFetch(tip0, records, 
acquiredRecords, Errors.NONE, Errors.NONE));
         partitionDataMap.put(t2ip0, partitionDataForFetch(t2ip0, records, 
emptyAcquiredRecords, Errors.TOPIC_AUTHORIZATION_FAILED, Errors.NONE));
-        client.prepareResponse(ShareFetchResponse.of(Errors.NONE, 0, 
partitionDataMap, Collections.emptyList()));
+        client.prepareResponse(ShareFetchResponse.of(Errors.NONE, 0, 
partitionDataMap, Collections.emptyList(), 0));
 
         networkClientDelegate.poll(time.timer(0));
         assertTrue(shareConsumeRequestManager.hasCompletedFetches());
@@ -1415,7 +1415,7 @@ public class ShareConsumeRequestManagerTest {
         LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData> 
partitionDataMap = new LinkedHashMap<>();
         partitionDataMap.put(t2ip0, partitionDataForFetch(t2ip0, records, 
emptyAcquiredRecords, Errors.TOPIC_AUTHORIZATION_FAILED, Errors.NONE));
         partitionDataMap.put(tip0, partitionDataForFetch(tip0, records, 
acquiredRecords, Errors.NONE, Errors.NONE));
-        client.prepareResponse(ShareFetchResponse.of(Errors.NONE, 0, 
partitionDataMap, Collections.emptyList()));
+        client.prepareResponse(ShareFetchResponse.of(Errors.NONE, 0, 
partitionDataMap, Collections.emptyList(), 0));
 
         networkClientDelegate.poll(time.timer(0));
         assertTrue(shareConsumeRequestManager.hasCompletedFetches());
@@ -1869,13 +1869,13 @@ public class ShareConsumeRequestManagerTest {
                 .setRecords(records)
                 
.setAcquiredRecords(ShareCompletedFetchTest.acquiredRecords(1L, 1))
                 .setAcknowledgeErrorCode(Errors.NONE.code()));
-        client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0, 
partitionData, Collections.emptyList()), nodeId0);
+        client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0, 
partitionData, Collections.emptyList(), 0), nodeId0);
         partitionData.clear();
         partitionData.put(tip1,
             new ShareFetchResponseData.PartitionData()
                 .setPartitionIndex(tip1.topicPartition().partition())
                 .setErrorCode(error.code()));
-        client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0, 
partitionData, Collections.emptyList()), nodeId1);
+        client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0, 
partitionData, Collections.emptyList(), 0), nodeId1);
         networkClientDelegate.poll(time.timer(0));
         assertTrue(shareConsumeRequestManager.hasCompletedFetches());
 
@@ -1921,7 +1921,7 @@ public class ShareConsumeRequestManagerTest {
                 .setRecords(records)
                 
.setAcquiredRecords(ShareCompletedFetchTest.acquiredRecords(1L, 1))
                 .setAcknowledgeErrorCode(Errors.NONE.code()));
-        client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0, 
partitionData, Collections.emptyList()), nodeId0);
+        client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0, 
partitionData, Collections.emptyList(), 0), nodeId0);
         networkClientDelegate.poll(time.timer(0));
         assertTrue(shareConsumeRequestManager.hasCompletedFetches());
 
@@ -1971,7 +1971,7 @@ public class ShareConsumeRequestManagerTest {
                 .setRecords(records)
                 
.setAcquiredRecords(ShareCompletedFetchTest.acquiredRecords(1L, 1))
                 .setAcknowledgeErrorCode(Errors.NONE.code()));
-        client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0, 
partitionData, Collections.emptyList()), nodeId0);
+        client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0, 
partitionData, Collections.emptyList(), 0), nodeId0);
         partitionData.clear();
         partitionData.put(tip1,
             new ShareFetchResponseData.PartitionData()
@@ -1980,7 +1980,7 @@ public class ShareConsumeRequestManagerTest {
                 .setCurrentLeader(new ShareFetchResponseData.LeaderIdAndEpoch()
                     .setLeaderId(tp0Leader.id())
                     .setLeaderEpoch(validLeaderEpoch + 1)));
-        client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0, 
partitionData, singletonList(tp0Leader)), nodeId1);
+        client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0, 
partitionData, singletonList(tp0Leader), 0), nodeId1);
         networkClientDelegate.poll(time.timer(0));
         assertTrue(shareConsumeRequestManager.hasCompletedFetches());
 
@@ -2019,7 +2019,7 @@ public class ShareConsumeRequestManagerTest {
                 .setRecords(records)
                 
.setAcquiredRecords(ShareCompletedFetchTest.acquiredRecords(1L, 1))
                 .setAcknowledgeErrorCode(Errors.NONE.code()));
-        client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0, 
partitionData, Collections.emptyList()), nodeId0);
+        client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0, 
partitionData, Collections.emptyList(), 0), nodeId0);
         networkClientDelegate.poll(time.timer(0));
         assertTrue(shareConsumeRequestManager.hasCompletedFetches());
 
@@ -2069,13 +2069,13 @@ public class ShareConsumeRequestManagerTest {
                 .setRecords(records)
                 
.setAcquiredRecords(ShareCompletedFetchTest.acquiredRecords(1L, 1))
                 .setAcknowledgeErrorCode(Errors.NONE.code()));
-        client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0, 
partitionData, Collections.emptyList()), nodeId0);
+        client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0, 
partitionData, Collections.emptyList(), 0), nodeId0);
         partitionData.clear();
         partitionData.put(tip1,
             new ShareFetchResponseData.PartitionData()
                 .setPartitionIndex(tip1.topicPartition().partition())
                 .setErrorCode(Errors.NONE.code()));
-        client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0, 
partitionData, Collections.emptyList()), nodeId1);
+        client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0, 
partitionData, Collections.emptyList(), 0), nodeId1);
         networkClientDelegate.poll(time.timer(0));
         assertTrue(shareConsumeRequestManager.hasCompletedFetches());
 
@@ -2112,7 +2112,7 @@ public class ShareConsumeRequestManagerTest {
                 .setPartitionIndex(tip0.topicPartition().partition())
                 .setErrorCode(Errors.NONE.code())
                 .setAcknowledgeErrorCode(error.code()));
-        client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0, 
partitionData, Collections.emptyList()), nodeId0);
+        client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0, 
partitionData, Collections.emptyList(), 0), nodeId0);
         partitionData.clear();
         partitionData.put(tip0,
             new ShareFetchResponseData.PartitionData()
@@ -2127,7 +2127,7 @@ public class ShareConsumeRequestManagerTest {
                 .setRecords(records)
                 
.setAcquiredRecords(ShareCompletedFetchTest.acquiredRecords(1L, 1))
                 .setAcknowledgeErrorCode(Errors.NONE.code()));
-        client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0, 
partitionData, Collections.emptyList()), nodeId1);
+        client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0, 
partitionData, Collections.emptyList(), 0), nodeId1);
         networkClientDelegate.poll(time.timer(0));
         assertTrue(shareConsumeRequestManager.hasCompletedFetches());
 
@@ -2172,7 +2172,7 @@ public class ShareConsumeRequestManagerTest {
                         .setRecords(records)
                         
.setAcquiredRecords(ShareCompletedFetchTest.acquiredRecords(1L, 1))
                         .setAcknowledgeErrorCode(Errors.NONE.code()));
-        client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0, 
partitionData, Collections.emptyList()), nodeId0);
+        client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0, 
partitionData, Collections.emptyList(), 0), nodeId0);
         partitionData.clear();
         partitionData.put(tip1,
                 new ShareFetchResponseData.PartitionData()
@@ -2181,7 +2181,7 @@ public class ShareConsumeRequestManagerTest {
                         .setRecords(records)
                         
.setAcquiredRecords(ShareCompletedFetchTest.acquiredRecords(1L, 2))
                         .setAcknowledgeErrorCode(Errors.NONE.code()));
-        client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0, 
partitionData, Collections.emptyList()), nodeId1);
+        client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0, 
partitionData, Collections.emptyList(), 0), nodeId1);
         networkClientDelegate.poll(time.timer(0));
         assertTrue(shareConsumeRequestManager.hasCompletedFetches());
 
@@ -2258,7 +2258,7 @@ public class ShareConsumeRequestManagerTest {
                         .setRecords(records)
                         
.setAcquiredRecords(ShareCompletedFetchTest.acquiredRecords(1L, 1))
                         .setAcknowledgeErrorCode(Errors.NONE.code()));
-        client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0, 
partitionData, Collections.emptyList()), nodeId0);
+        client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0, 
partitionData, Collections.emptyList(), 0), nodeId0);
         partitionData.clear();
         partitionData.put(tip1,
                 new ShareFetchResponseData.PartitionData()
@@ -2267,7 +2267,7 @@ public class ShareConsumeRequestManagerTest {
                         .setRecords(records)
                         
.setAcquiredRecords(ShareCompletedFetchTest.acquiredRecords(1L, 2))
                         .setAcknowledgeErrorCode(Errors.NONE.code()));
-        client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0, 
partitionData, Collections.emptyList()), nodeId1);
+        client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0, 
partitionData, Collections.emptyList(), 0), nodeId1);
         networkClientDelegate.poll(time.timer(0));
         assertTrue(shareConsumeRequestManager.hasCompletedFetches());
 
@@ -2348,7 +2348,7 @@ public class ShareConsumeRequestManagerTest {
                         .setRecords(records)
                         
.setAcquiredRecords(ShareCompletedFetchTest.acquiredRecords(1L, 1))
                         .setAcknowledgeErrorCode(Errors.NONE.code()));
-        client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0, 
partitionData, Collections.emptyList()), nodeId0);
+        client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0, 
partitionData, Collections.emptyList(), 0), nodeId0);
         partitionData.clear();
         partitionData.put(tip1,
                 new ShareFetchResponseData.PartitionData()
@@ -2357,7 +2357,7 @@ public class ShareConsumeRequestManagerTest {
                         .setRecords(records)
                         
.setAcquiredRecords(ShareCompletedFetchTest.acquiredRecords(1L, 2))
                         .setAcknowledgeErrorCode(Errors.NONE.code()));
-        client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0, 
partitionData, Collections.emptyList()), nodeId1);
+        client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0, 
partitionData, Collections.emptyList(), 0), nodeId1);
         networkClientDelegate.poll(time.timer(0));
         assertTrue(shareConsumeRequestManager.hasCompletedFetches());
 
@@ -2441,13 +2441,13 @@ public class ShareConsumeRequestManagerTest {
                 .setRecords(records)
                 
.setAcquiredRecords(ShareCompletedFetchTest.acquiredRecords(1L, 1))
                 .setAcknowledgeErrorCode(Errors.NONE.code()));
-        client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0, 
partitionData, Collections.emptyList()), nodeId0);
+        client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0, 
partitionData, Collections.emptyList(), 0), nodeId0);
         partitionData.clear();
         partitionData.put(tip1,
             new ShareFetchResponseData.PartitionData()
                 .setPartitionIndex(tip1.topicPartition().partition())
                 .setErrorCode(Errors.NONE.code()));
-        client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0, 
partitionData, Collections.emptyList()), nodeId1);
+        client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0, 
partitionData, Collections.emptyList(), 0), nodeId1);
         networkClientDelegate.poll(time.timer(0));
         assertTrue(shareConsumeRequestManager.hasCompletedFetches());
 
@@ -2477,7 +2477,7 @@ public class ShareConsumeRequestManagerTest {
                 .setPartitionIndex(tip0.topicPartition().partition())
                 .setErrorCode(Errors.NONE.code())
                 .setAcknowledgeErrorCode(Errors.NONE.code()));
-        client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0, 
partitionData, Collections.emptyList()), nodeId0, true);
+        client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0, 
partitionData, Collections.emptyList(), 0), nodeId0, true);
         partitionData.clear();
         partitionData.put(tip1,
             new ShareFetchResponseData.PartitionData()
@@ -2485,7 +2485,7 @@ public class ShareConsumeRequestManagerTest {
                 .setRecords(records)
                 
.setAcquiredRecords(ShareCompletedFetchTest.acquiredRecords(1L, 1))
                 .setAcknowledgeErrorCode(Errors.NONE.code()));
-        client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0, 
partitionData, Collections.emptyList()), nodeId1);
+        client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0, 
partitionData, Collections.emptyList(), 0), nodeId1);
         networkClientDelegate.poll(time.timer(0));
         assertTrue(shareConsumeRequestManager.hasCompletedFetches());
 
@@ -2523,7 +2523,7 @@ public class ShareConsumeRequestManagerTest {
         partitionData.put(tip1,
             new ShareFetchResponseData.PartitionData()
                 .setPartitionIndex(tip1.topicPartition().partition()));
-        client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0, 
partitionData, Collections.emptyList()), nodeId1);
+        client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0, 
partitionData, Collections.emptyList(), 0), nodeId1);
         networkClientDelegate.poll(time.timer(0));
         assertTrue(shareConsumeRequestManager.hasCompletedFetches());
 
@@ -2542,7 +2542,7 @@ public class ShareConsumeRequestManagerTest {
                 new ShareFetchResponseData.PartitionData()
                         .setPartitionIndex(tp.topicPartition().partition())
                         .setErrorCode(error.code()));
-        return ShareFetchResponse.of(error, 0, new 
LinkedHashMap<>(partitions), Collections.emptyList());
+        return ShareFetchResponse.of(error, 0, new 
LinkedHashMap<>(partitions), Collections.emptyList(), 0);
     }
 
     private ShareFetchResponse fullFetchResponse(TopicIdPartition tp,
@@ -2559,7 +2559,7 @@ public class ShareConsumeRequestManagerTest {
                                                  Errors acknowledgeError) {
         Map<TopicIdPartition, ShareFetchResponseData.PartitionData> partitions 
= Map.of(tp,
                 partitionDataForFetch(tp, records, acquiredRecords, error, 
acknowledgeError));
-        return ShareFetchResponse.of(Errors.NONE, 0, new 
LinkedHashMap<>(partitions), Collections.emptyList());
+        return ShareFetchResponse.of(Errors.NONE, 0, new 
LinkedHashMap<>(partitions), Collections.emptyList(), 0);
     }
 
     private ShareAcknowledgeResponse emptyAcknowledgeResponse() {
diff --git a/core/src/main/java/kafka/server/builders/KafkaApisBuilder.java 
b/core/src/main/java/kafka/server/builders/KafkaApisBuilder.java
index 52b9eb93068..fa4997bbf08 100644
--- a/core/src/main/java/kafka/server/builders/KafkaApisBuilder.java
+++ b/core/src/main/java/kafka/server/builders/KafkaApisBuilder.java
@@ -32,6 +32,7 @@ import kafka.server.share.SharePartitionManager;
 
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.coordinator.group.GroupConfigManager;
 import org.apache.kafka.coordinator.group.GroupCoordinator;
 import org.apache.kafka.coordinator.share.ShareCoordinator;
 import org.apache.kafka.metadata.ConfigRepository;
@@ -68,6 +69,7 @@ public class KafkaApisBuilder {
     private ApiVersionManager apiVersionManager = null;
     private ClientMetricsManager clientMetricsManager = null;
     private Optional<ShareCoordinator> shareCoordinator = Optional.empty();
+    private GroupConfigManager groupConfigManager = null;
 
     public KafkaApisBuilder setRequestChannel(RequestChannel requestChannel) {
         this.requestChannel = requestChannel;
@@ -179,6 +181,11 @@ public class KafkaApisBuilder {
         return this;
     }
 
+    public KafkaApisBuilder setGroupConfigManager(GroupConfigManager 
groupConfigManager) {
+        this.groupConfigManager = groupConfigManager;
+        return this;
+    }
+
     @SuppressWarnings({"CyclomaticComplexity"})
     public KafkaApis build() {
         if (requestChannel == null) throw new RuntimeException("you must set 
requestChannel");
@@ -198,6 +205,7 @@ public class KafkaApisBuilder {
         if (clientMetricsManager == null) throw new RuntimeException("You must 
set clientMetricsManager");
         if (brokerTopicStats == null) brokerTopicStats = new 
BrokerTopicStats(config.remoteLogManagerConfig().isRemoteStorageSystemEnabled());
         if (apiVersionManager == null) throw new RuntimeException("You must 
set apiVersionManager");
+        if (groupConfigManager == null) throw new RuntimeException("You must 
set groupConfigManager");
 
         return new KafkaApis(requestChannel,
                              forwardingManager,
@@ -220,6 +228,7 @@ public class KafkaApisBuilder {
                              time,
                              tokenManager,
                              apiVersionManager,
-                             clientMetricsManager);
+                             clientMetricsManager,
+                             groupConfigManager);
     }
 }
diff --git a/core/src/main/java/kafka/server/share/ShareFetchUtils.java 
b/core/src/main/java/kafka/server/share/ShareFetchUtils.java
index da4f8ef6668..070b972d110 100644
--- a/core/src/main/java/kafka/server/share/ShareFetchUtils.java
+++ b/core/src/main/java/kafka/server/share/ShareFetchUtils.java
@@ -31,6 +31,7 @@ import 
org.apache.kafka.common.record.FileLogInputStream.FileChannelRecordBatch;
 import org.apache.kafka.common.record.FileRecords;
 import org.apache.kafka.common.record.Records;
 import org.apache.kafka.common.requests.ListOffsetsRequest;
+import org.apache.kafka.coordinator.group.GroupConfigManager;
 import org.apache.kafka.server.share.SharePartitionKey;
 import org.apache.kafka.server.share.fetch.ShareAcquiredRecords;
 import org.apache.kafka.server.share.fetch.ShareFetch;
@@ -258,4 +259,20 @@ public class ShareFetchUtils {
             return records;
         }
     }
+
+    /**
+     * The method is used to get the record lock duration for the group. If 
the group config is present,
+     * then the record lock duration is returned. Otherwise, the default value 
is returned.
+     *
+     * @param groupConfigManager The group config manager.
+     * @param groupId The group id for which the record lock duration is to be 
fetched.
+     * @param defaultValue The default value to be returned if the group 
config is not present.
+     * @return The record lock duration for the group.
+     */
+    public static int recordLockDurationMsOrDefault(GroupConfigManager 
groupConfigManager, String groupId, int defaultValue) {
+        if (groupConfigManager.groupConfig(groupId).isPresent()) {
+            return 
groupConfigManager.groupConfig(groupId).get().shareRecordLockDurationMs();
+        }
+        return defaultValue;
+    }
 }
diff --git a/core/src/main/java/kafka/server/share/SharePartition.java 
b/core/src/main/java/kafka/server/share/SharePartition.java
index 619469c8c9a..34966022d32 100644
--- a/core/src/main/java/kafka/server/share/SharePartition.java
+++ b/core/src/main/java/kafka/server/share/SharePartition.java
@@ -79,6 +79,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 import static kafka.server.share.ShareFetchUtils.offsetForEarliestTimestamp;
 import static kafka.server.share.ShareFetchUtils.offsetForLatestTimestamp;
 import static kafka.server.share.ShareFetchUtils.offsetForTimestamp;
+import static kafka.server.share.ShareFetchUtils.recordLockDurationMsOrDefault;
 
 /**
  * The SharePartition is used to track the state of a partition that is shared 
between multiple
@@ -2302,12 +2303,7 @@ public class SharePartition {
         // The recordLockDuration value would depend on whether the dynamic 
config SHARE_RECORD_LOCK_DURATION_MS in
         // GroupConfig.java is set or not. If dynamic config is set, then that 
is used, otherwise the value of
         // SHARE_GROUP_RECORD_LOCK_DURATION_MS_CONFIG defined in 
ShareGroupConfig is used
-        int recordLockDurationMs;
-        if (groupConfigManager.groupConfig(groupId).isPresent()) {
-            recordLockDurationMs = 
groupConfigManager.groupConfig(groupId).get().shareRecordLockDurationMs();
-        } else {
-            recordLockDurationMs = defaultRecordLockDurationMs;
-        }
+        int recordLockDurationMs = 
recordLockDurationMsOrDefault(groupConfigManager, groupId, 
defaultRecordLockDurationMs);
         return scheduleAcquisitionLockTimeout(memberId, firstOffset, 
lastOffset, recordLockDurationMs);
     }
 
diff --git a/core/src/main/java/kafka/server/share/SharePartitionManager.java 
b/core/src/main/java/kafka/server/share/SharePartitionManager.java
index 4a8c373fa69..42081241fbc 100644
--- a/core/src/main/java/kafka/server/share/SharePartitionManager.java
+++ b/core/src/main/java/kafka/server/share/SharePartitionManager.java
@@ -145,11 +145,6 @@ public class SharePartitionManager implements 
AutoCloseable {
      */
     private final BrokerTopicStats brokerTopicStats;
 
-    /**
-     * The max fetch records is the maximum number of records that can be 
fetched by a share fetch request.
-     */
-    private final int maxFetchRecords;
-
     public SharePartitionManager(
         ReplicaManager replicaManager,
         Time time,
@@ -157,7 +152,6 @@ public class SharePartitionManager implements AutoCloseable 
{
         int defaultRecordLockDurationMs,
         int maxDeliveryCount,
         int maxInFlightMessages,
-        int maxFetchRecords,
         Persister persister,
         GroupConfigManager groupConfigManager,
         BrokerTopicStats brokerTopicStats
@@ -169,7 +163,6 @@ public class SharePartitionManager implements AutoCloseable 
{
             defaultRecordLockDurationMs,
             maxDeliveryCount,
             maxInFlightMessages,
-            maxFetchRecords,
             persister,
             groupConfigManager,
             new ShareGroupMetrics(time),
@@ -185,7 +178,6 @@ public class SharePartitionManager implements AutoCloseable 
{
         int defaultRecordLockDurationMs,
         int maxDeliveryCount,
         int maxInFlightMessages,
-        int maxFetchRecords,
         Persister persister,
         GroupConfigManager groupConfigManager,
         ShareGroupMetrics shareGroupMetrics,
@@ -200,7 +192,6 @@ public class SharePartitionManager implements AutoCloseable 
{
                 new SystemTimer("share-group-lock-timeout")),
             maxDeliveryCount,
             maxInFlightMessages,
-            maxFetchRecords,
             persister,
             groupConfigManager,
             shareGroupMetrics,
@@ -218,7 +209,6 @@ public class SharePartitionManager implements AutoCloseable 
{
             Timer timer,
             int maxDeliveryCount,
             int maxInFlightMessages,
-            int maxFetchRecords,
             Persister persister,
             GroupConfigManager groupConfigManager,
             ShareGroupMetrics shareGroupMetrics,
@@ -235,7 +225,6 @@ public class SharePartitionManager implements AutoCloseable 
{
         this.persister = persister;
         this.groupConfigManager = groupConfigManager;
         this.shareGroupMetrics = shareGroupMetrics;
-        this.maxFetchRecords = maxFetchRecords;
         this.brokerTopicStats = brokerTopicStats;
     }
 
@@ -246,6 +235,8 @@ public class SharePartitionManager implements AutoCloseable 
{
      * @param groupId The group id, this is used to identify the share group.
      * @param memberId The member id, generated by the group-coordinator, this 
is used to identify the client.
      * @param fetchParams The fetch parameters from the share fetch request.
+     * @param sessionEpoch The session epoch for the member.
+     * @param maxFetchRecords The maximum number of records to fetch.
      * @param batchSize The number of records per acquired records batch.
      * @param topicIdPartitions The topic partitions to fetch for.
      *
@@ -256,6 +247,7 @@ public class SharePartitionManager implements AutoCloseable 
{
         String memberId,
         FetchParams fetchParams,
         int sessionEpoch,
+        int maxFetchRecords,
         int batchSize,
         List<TopicIdPartition> topicIdPartitions
     ) {
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala 
b/core/src/main/scala/kafka/server/BrokerServer.scala
index ace63454f93..0823ec683f7 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -438,7 +438,6 @@ class BrokerServer(
         config.shareGroupConfig.shareGroupRecordLockDurationMs,
         config.shareGroupConfig.shareGroupDeliveryCountLimit,
         config.shareGroupConfig.shareGroupPartitionMaxRecordLocks,
-        config.shareGroupConfig.shareFetchMaxFetchRecords,
         persister,
         groupConfigManager,
         brokerTopicStats
@@ -466,7 +465,8 @@ class BrokerServer(
         time = time,
         tokenManager = tokenManager,
         apiVersionManager = apiVersionManager,
-        clientMetricsManager = clientMetricsManager)
+        clientMetricsManager = clientMetricsManager,
+        groupConfigManager = groupConfigManager)
 
       dataPlaneRequestHandlerPool = new KafkaRequestHandlerPool(config.nodeId,
         socketServer.dataPlaneRequestChannel, dataPlaneRequestProcessor, time,
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala 
b/core/src/main/scala/kafka/server/KafkaApis.scala
index af851f3ed84..6de51d07332 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -21,7 +21,7 @@ import kafka.coordinator.transaction.{InitProducerIdResult, 
TransactionCoordinat
 import kafka.network.RequestChannel
 import kafka.server.QuotaFactory.{QuotaManagers, UNBOUNDED_QUOTA}
 import kafka.server.handlers.DescribeTopicPartitionsRequestHandler
-import kafka.server.share.SharePartitionManager
+import kafka.server.share.{ShareFetchUtils, SharePartitionManager}
 import kafka.utils.Logging
 import org.apache.kafka.clients.CommonClientConfigs
 import org.apache.kafka.clients.admin.EndpointType
@@ -55,7 +55,7 @@ import org.apache.kafka.common.security.auth.{KafkaPrincipal, 
SecurityProtocol}
 import org.apache.kafka.common.security.token.delegation.{DelegationToken, 
TokenInformation}
 import org.apache.kafka.common.utils.{ProducerIdAndEpoch, Time}
 import org.apache.kafka.common.{Node, TopicIdPartition, TopicPartition, Uuid}
-import org.apache.kafka.coordinator.group.{Group, GroupCoordinator}
+import org.apache.kafka.coordinator.group.{Group, GroupConfigManager, 
GroupCoordinator}
 import org.apache.kafka.coordinator.share.ShareCoordinator
 import org.apache.kafka.metadata.{ConfigRepository, MetadataCache}
 import org.apache.kafka.server.ClientMetricsManager
@@ -104,7 +104,8 @@ class KafkaApis(val requestChannel: RequestChannel,
                 time: Time,
                 val tokenManager: DelegationTokenManager,
                 val apiVersionManager: ApiVersionManager,
-                val clientMetricsManager: ClientMetricsManager
+                val clientMetricsManager: ClientMetricsManager,
+                val groupConfigManager: GroupConfigManager
 ) extends ApiRequestHandler with Logging {
 
   type FetchResponseStats = Map[TopicPartition, RecordValidationStats]
@@ -3215,6 +3216,7 @@ class KafkaApis(val requestChannel: RequestChannel,
         shareFetchRequest.data.memberId,
         params,
         shareSessionEpoch,
+        shareFetchRequest.data.maxRecords,
         shareFetchRequest.data.batchSize,
         interestedTopicPartitions
       ).thenApply{ result =>
@@ -3749,7 +3751,8 @@ class KafkaApis(val requestChannel: RequestChannel,
 
       // Prepare share fetch response
       val response =
-        ShareFetchResponse.of(shareFetchResponse.error, throttleTimeMs, 
responseData, nodeEndpoints.values.toList.asJava)
+        ShareFetchResponse.of(shareFetchResponse.error, throttleTimeMs, 
responseData, nodeEndpoints.values.toList.asJava,
+          ShareFetchUtils.recordLockDurationMsOrDefault(groupConfigManager, 
groupId, config.shareGroupConfig.shareGroupRecordLockDurationMs))
       // record the bytes out metrics only when the response is being sent.
       response.data.responses.forEach { topicResponse =>
         topicResponse.partitions.forEach { data =>
diff --git 
a/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java 
b/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
index c00897eee25..b02fafe4ce4 100644
--- a/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
+++ b/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
@@ -1044,19 +1044,19 @@ public class SharePartitionManagerTest {
         doAnswer(invocation -> 
buildLogReadResult(topicIdPartitions)).when(mockReplicaManager).readFromLog(any(),
 any(), any(ReplicaQuota.class), anyBoolean());
 
         CompletableFuture<Map<TopicIdPartition, PartitionData>> future = 
sharePartitionManager.fetchMessages(
-            groupId, memberId1.toString(), FETCH_PARAMS, 1, BATCH_SIZE, 
topicIdPartitions);
+            groupId, memberId1.toString(), FETCH_PARAMS, 1, MAX_FETCH_RECORDS, 
BATCH_SIZE, topicIdPartitions);
         assertTrue(future.isDone());
         Mockito.verify(mockReplicaManager, times(1)).readFromLog(
             any(), any(), any(ReplicaQuota.class), anyBoolean());
 
-        future = sharePartitionManager.fetchMessages(groupId, 
memberId1.toString(), FETCH_PARAMS, 3, BATCH_SIZE,
-            topicIdPartitions);
+        future = sharePartitionManager.fetchMessages(groupId, 
memberId1.toString(), FETCH_PARAMS, 3,
+            MAX_FETCH_RECORDS, BATCH_SIZE, topicIdPartitions);
         assertTrue(future.isDone());
         Mockito.verify(mockReplicaManager, times(2)).readFromLog(
             any(), any(), any(ReplicaQuota.class), anyBoolean());
 
-        future = sharePartitionManager.fetchMessages(groupId, 
memberId1.toString(), FETCH_PARAMS, 10, BATCH_SIZE,
-            topicIdPartitions);
+        future = sharePartitionManager.fetchMessages(groupId, 
memberId1.toString(), FETCH_PARAMS, 10,
+            MAX_FETCH_RECORDS, BATCH_SIZE, topicIdPartitions);
         assertTrue(future.isDone());
         Mockito.verify(mockReplicaManager, times(3)).readFromLog(
             any(), any(), any(ReplicaQuota.class), anyBoolean());
@@ -1150,7 +1150,7 @@ public class SharePartitionManagerTest {
             for (int i = 0; i != threadCount; ++i) {
                 executorService.submit(() -> {
                     sharePartitionManager.fetchMessages(groupId, 
memberId1.toString(), FETCH_PARAMS, 0,
-                        BATCH_SIZE, topicIdPartitions);
+                        MAX_FETCH_RECORDS, BATCH_SIZE, topicIdPartitions);
                 });
                 // We are blocking the main thread at an interval of 10 
threads so that the currently running executorService threads can complete.
                 if (i % 10 == 0)
@@ -1197,7 +1197,7 @@ public class SharePartitionManagerTest {
 
         CompletableFuture<Map<TopicIdPartition, 
ShareFetchResponseData.PartitionData>> future =
             sharePartitionManager.fetchMessages(groupId, memberId.toString(), 
FETCH_PARAMS, 0,
-                BATCH_SIZE, topicIdPartitions);
+                MAX_FETCH_RECORDS, BATCH_SIZE, topicIdPartitions);
         Mockito.verify(mockReplicaManager, times(0)).readFromLog(
             any(), any(), any(ReplicaQuota.class), anyBoolean());
         Map<TopicIdPartition, ShareFetchResponseData.PartitionData> result = 
future.join();
@@ -1235,8 +1235,8 @@ public class SharePartitionManagerTest {
 
         doAnswer(invocation -> 
buildLogReadResult(topicIdPartitions)).when(mockReplicaManager).readFromLog(any(),
 any(), any(ReplicaQuota.class), anyBoolean());
 
-        sharePartitionManager.fetchMessages(groupId, memberId.toString(), 
FETCH_PARAMS, 0, BATCH_SIZE,
-            topicIdPartitions);
+        sharePartitionManager.fetchMessages(groupId, memberId.toString(), 
FETCH_PARAMS, 0,
+            MAX_FETCH_RECORDS, BATCH_SIZE, topicIdPartitions);
         // Since the nextFetchOffset does not point to endOffset + 1, i.e. 
some of the records in the cachedState are AVAILABLE,
         // even though the maxInFlightMessages limit is exceeded, 
replicaManager.readFromLog should be called
         Mockito.verify(mockReplicaManager, times(1)).readFromLog(
@@ -2151,7 +2151,7 @@ public class SharePartitionManagerTest {
 
         CompletableFuture<Map<TopicIdPartition, 
ShareFetchResponseData.PartitionData>> future =
             sharePartitionManager.fetchMessages(groupId, memberId.toString(), 
FETCH_PARAMS, 0,
-                BATCH_SIZE, topicIdPartitions);
+                MAX_FETCH_RECORDS, BATCH_SIZE, topicIdPartitions);
         // Verify that the fetch request is completed.
         TestUtils.waitForCondition(
             future::isDone,
@@ -2221,7 +2221,7 @@ public class SharePartitionManagerTest {
 
         CompletableFuture<Map<TopicIdPartition, 
ShareFetchResponseData.PartitionData>> future =
             sharePartitionManager.fetchMessages(groupId, 
Uuid.randomUuid().toString(), FETCH_PARAMS, 0,
-                BATCH_SIZE, topicIdPartitions);
+                MAX_FETCH_RECORDS, BATCH_SIZE, topicIdPartitions);
         // Verify that the fetch request is completed.
         TestUtils.waitForCondition(
             future::isDone,
@@ -2283,15 +2283,15 @@ public class SharePartitionManagerTest {
         // Send 3 requests for share fetch for same share partition.
         CompletableFuture<Map<TopicIdPartition, 
ShareFetchResponseData.PartitionData>> future1 =
             sharePartitionManager.fetchMessages(groupId, memberId.toString(), 
FETCH_PARAMS, 0,
-                BATCH_SIZE, topicIdPartitions);
+                MAX_FETCH_RECORDS, BATCH_SIZE, topicIdPartitions);
 
         CompletableFuture<Map<TopicIdPartition, 
ShareFetchResponseData.PartitionData>> future2 =
             sharePartitionManager.fetchMessages(groupId, memberId.toString(), 
FETCH_PARAMS, 0,
-                BATCH_SIZE, topicIdPartitions);
+                MAX_FETCH_RECORDS, BATCH_SIZE, topicIdPartitions);
 
         CompletableFuture<Map<TopicIdPartition, 
ShareFetchResponseData.PartitionData>> future3 =
             sharePartitionManager.fetchMessages(groupId, memberId.toString(), 
FETCH_PARAMS, 0,
-                BATCH_SIZE, topicIdPartitions);
+                MAX_FETCH_RECORDS, BATCH_SIZE, topicIdPartitions);
 
         Mockito.verify(sp0, times(3)).maybeInitialize();
         Mockito.verify(mockReplicaManager, 
times(3)).addDelayedShareFetchRequest(any(), any());
@@ -2350,7 +2350,7 @@ public class SharePartitionManagerTest {
         when(sp0.maybeInitialize()).thenReturn(FutureUtils.failedFuture(new 
LeaderNotAvailableException("Leader not available")));
         CompletableFuture<Map<TopicIdPartition, 
ShareFetchResponseData.PartitionData>> future =
             sharePartitionManager.fetchMessages(groupId, memberId.toString(), 
FETCH_PARAMS, 0,
-                BATCH_SIZE, topicIdPartitions);
+                MAX_FETCH_RECORDS, BATCH_SIZE, topicIdPartitions);
         TestUtils.waitForCondition(
             future::isDone,
             DELAYED_SHARE_FETCH_TIMEOUT_MS,
@@ -2366,7 +2366,7 @@ public class SharePartitionManagerTest {
         // Return IllegalStateException to simulate initialization failure.
         when(sp0.maybeInitialize()).thenReturn(FutureUtils.failedFuture(new 
IllegalStateException("Illegal state")));
         future = sharePartitionManager.fetchMessages(groupId, 
memberId.toString(), FETCH_PARAMS, 0,
-            BATCH_SIZE, topicIdPartitions);
+            MAX_FETCH_RECORDS, BATCH_SIZE, topicIdPartitions);
         TestUtils.waitForCondition(
             future::isDone,
             DELAYED_SHARE_FETCH_TIMEOUT_MS,
@@ -2380,7 +2380,7 @@ public class SharePartitionManagerTest {
         // Return CoordinatorNotAvailableException to simulate initialization 
failure.
         when(sp0.maybeInitialize()).thenReturn(FutureUtils.failedFuture(new 
CoordinatorNotAvailableException("Coordinator not available")));
         future = sharePartitionManager.fetchMessages(groupId, 
memberId.toString(), FETCH_PARAMS, 0,
-            BATCH_SIZE, topicIdPartitions);
+            MAX_FETCH_RECORDS, BATCH_SIZE, topicIdPartitions);
         TestUtils.waitForCondition(
             future::isDone,
             DELAYED_SHARE_FETCH_TIMEOUT_MS,
@@ -2394,7 +2394,7 @@ public class SharePartitionManagerTest {
         // Return InvalidRequestException to simulate initialization failure.
         when(sp0.maybeInitialize()).thenReturn(FutureUtils.failedFuture(new 
InvalidRequestException("Invalid request")));
         future = sharePartitionManager.fetchMessages(groupId, 
memberId.toString(), FETCH_PARAMS, 0,
-            BATCH_SIZE, topicIdPartitions);
+            MAX_FETCH_RECORDS, BATCH_SIZE, topicIdPartitions);
         TestUtils.waitForCondition(
             future::isDone,
             DELAYED_SHARE_FETCH_TIMEOUT_MS,
@@ -2408,7 +2408,7 @@ public class SharePartitionManagerTest {
         // Return FencedStateEpochException to simulate initialization failure.
         when(sp0.maybeInitialize()).thenReturn(FutureUtils.failedFuture(new 
FencedStateEpochException("Fenced state epoch")));
         future = sharePartitionManager.fetchMessages(groupId, 
memberId.toString(), FETCH_PARAMS, 0,
-            BATCH_SIZE, topicIdPartitions);
+            MAX_FETCH_RECORDS, BATCH_SIZE, topicIdPartitions);
         TestUtils.waitForCondition(
             future::isDone,
             DELAYED_SHARE_FETCH_TIMEOUT_MS,
@@ -2422,7 +2422,7 @@ public class SharePartitionManagerTest {
         // Return NotLeaderOrFollowerException to simulate initialization 
failure.
         when(sp0.maybeInitialize()).thenReturn(FutureUtils.failedFuture(new 
NotLeaderOrFollowerException("Not leader or follower")));
         future = sharePartitionManager.fetchMessages(groupId, 
memberId.toString(), FETCH_PARAMS, 0,
-            BATCH_SIZE, topicIdPartitions);
+            MAX_FETCH_RECORDS, BATCH_SIZE, topicIdPartitions);
         TestUtils.waitForCondition(
             future::isDone,
             DELAYED_SHARE_FETCH_TIMEOUT_MS,
@@ -2436,7 +2436,7 @@ public class SharePartitionManagerTest {
         // Return RuntimeException to simulate initialization failure.
         when(sp0.maybeInitialize()).thenReturn(FutureUtils.failedFuture(new 
RuntimeException("Runtime exception")));
         future = sharePartitionManager.fetchMessages(groupId, 
memberId.toString(), FETCH_PARAMS, 0,
-            BATCH_SIZE, topicIdPartitions);
+            MAX_FETCH_RECORDS, BATCH_SIZE, topicIdPartitions);
         TestUtils.waitForCondition(
             future::isDone,
             DELAYED_SHARE_FETCH_TIMEOUT_MS,
@@ -2472,7 +2472,7 @@ public class SharePartitionManagerTest {
 
         CompletableFuture<Map<TopicIdPartition, 
ShareFetchResponseData.PartitionData>> future =
             sharePartitionManager.fetchMessages(groupId, 
Uuid.randomUuid().toString(), FETCH_PARAMS, 0,
-                BATCH_SIZE, topicIdPartitions);
+                MAX_FETCH_RECORDS, BATCH_SIZE, topicIdPartitions);
         TestUtils.waitForCondition(
             future::isDone,
             DELAYED_SHARE_FETCH_TIMEOUT_MS,
@@ -2513,7 +2513,7 @@ public class SharePartitionManagerTest {
         // Validate when exception is thrown.
         CompletableFuture<Map<TopicIdPartition, 
ShareFetchResponseData.PartitionData>> future =
             sharePartitionManager.fetchMessages(groupId, 
Uuid.randomUuid().toString(), FETCH_PARAMS, 0,
-                BATCH_SIZE, topicIdPartitions);
+                MAX_FETCH_RECORDS, BATCH_SIZE, topicIdPartitions);
         TestUtils.waitForCondition(
             future::isDone,
             DELAYED_SHARE_FETCH_TIMEOUT_MS,
@@ -2523,7 +2523,7 @@ public class SharePartitionManagerTest {
 
         // Validate when partition is not leader.
         future = sharePartitionManager.fetchMessages(groupId, 
Uuid.randomUuid().toString(), FETCH_PARAMS, 0,
-            BATCH_SIZE, topicIdPartitions);
+            MAX_FETCH_RECORDS, BATCH_SIZE, topicIdPartitions);
         TestUtils.waitForCondition(
             future::isDone,
             DELAYED_SHARE_FETCH_TIMEOUT_MS,
@@ -2592,7 +2592,7 @@ public class SharePartitionManagerTest {
         // Validate when exception is thrown.
         CompletableFuture<Map<TopicIdPartition, 
ShareFetchResponseData.PartitionData>> future =
             sharePartitionManager.fetchMessages(groupId, 
Uuid.randomUuid().toString(), FETCH_PARAMS, 0,
-                BATCH_SIZE, topicIdPartitions);
+                MAX_FETCH_RECORDS, BATCH_SIZE, topicIdPartitions);
         assertTrue(future.isDone());
         assertFalse(future.isCompletedExceptionally());
 
@@ -2649,7 +2649,7 @@ public class SharePartitionManagerTest {
 
         CompletableFuture<Map<TopicIdPartition, 
ShareFetchResponseData.PartitionData>> future =
             sharePartitionManager.fetchMessages(groupId, memberId.toString(), 
FETCH_PARAMS, 0,
-                BATCH_SIZE, topicIdPartitions);
+                MAX_FETCH_RECORDS, BATCH_SIZE, topicIdPartitions);
         validateShareFetchFutureException(future, tp0, 
Errors.UNKNOWN_SERVER_ERROR, "Exception");
         // Verify that the share partition is still in the cache on exception.
         assertEquals(1, partitionCacheMap.size());
@@ -2658,7 +2658,7 @@ public class SharePartitionManagerTest {
         doThrow(new NotLeaderOrFollowerException("Leader 
exception")).when(mockReplicaManager).readFromLog(any(), any(), 
any(ReplicaQuota.class), anyBoolean());
 
         future = sharePartitionManager.fetchMessages(groupId, 
memberId.toString(), FETCH_PARAMS, 0,
-            BATCH_SIZE, topicIdPartitions);
+            MAX_FETCH_RECORDS, BATCH_SIZE, topicIdPartitions);
         validateShareFetchFutureException(future, tp0, 
Errors.NOT_LEADER_OR_FOLLOWER, "Leader exception");
         assertTrue(partitionCacheMap.isEmpty());
         // Should have 2 fetch recorded and 2 failures.
@@ -2711,7 +2711,7 @@ public class SharePartitionManagerTest {
 
         CompletableFuture<Map<TopicIdPartition, 
ShareFetchResponseData.PartitionData>> future =
             sharePartitionManager.fetchMessages(groupId, memberId.toString(), 
FETCH_PARAMS, 0,
-                BATCH_SIZE, topicIdPartitions);
+                MAX_FETCH_RECORDS, BATCH_SIZE, topicIdPartitions);
         validateShareFetchFutureException(future, tp0, 
Errors.FENCED_STATE_EPOCH, "Fenced exception");
         // Verify that tp1 is still in the cache on exception.
         assertEquals(1, partitionCacheMap.size());
@@ -2726,7 +2726,7 @@ public class SharePartitionManagerTest {
         doThrow(new FencedStateEpochException("Fenced exception 
again")).when(mockReplicaManager).readFromLog(any(), any(), 
any(ReplicaQuota.class), anyBoolean());
 
         future = sharePartitionManager.fetchMessages(groupId, 
memberId.toString(), FETCH_PARAMS, 0,
-            BATCH_SIZE, topicIdPartitions);
+            MAX_FETCH_RECORDS, BATCH_SIZE, topicIdPartitions);
         validateShareFetchFutureException(future, List.of(tp0, tp1), 
Errors.FENCED_STATE_EPOCH, "Fenced exception again");
         assertTrue(partitionCacheMap.isEmpty());
         // Should have 4 fetch recorded (2 fetch and 2 topics) and 3 failures 
as sp1 was not acquired
@@ -2758,7 +2758,7 @@ public class SharePartitionManagerTest {
             .build();
 
         CompletableFuture<Map<TopicIdPartition, PartitionData>> future = 
sharePartitionManager.fetchMessages(
-            groupId, memberId.toString(), FETCH_PARAMS, 0, BATCH_SIZE, 
topicIdPartitions);
+            groupId, memberId.toString(), FETCH_PARAMS, 0, MAX_FETCH_RECORDS, 
BATCH_SIZE, topicIdPartitions);
         assertTrue(future.isDone());
         // Validate that the listener is registered.
         verify(mockReplicaManager, times(2)).maybeAddListener(any(), any());
@@ -2821,39 +2821,39 @@ public class SharePartitionManagerTest {
         // Capture the arguments passed to processShareFetch.
         ArgumentCaptor<ShareFetch> captor = 
ArgumentCaptor.forClass(ShareFetch.class);
 
-        sharePartitionManager.fetchMessages(groupId, memberId1.toString(), 
FETCH_PARAMS, 0, BATCH_SIZE,
-            topicIdPartitions);
+        sharePartitionManager.fetchMessages(groupId, memberId1.toString(), 
FETCH_PARAMS, 0,
+            MAX_FETCH_RECORDS, BATCH_SIZE, topicIdPartitions);
         verify(sharePartitionManager, 
times(1)).processShareFetch(captor.capture());
         // Verify the partitions rotation, no rotation.
         ShareFetch resultShareFetch = captor.getValue();
         validateRotatedListEquals(resultShareFetch.topicIdPartitions(), 
topicIdPartitions, 0);
 
         // Single rotation.
-        sharePartitionManager.fetchMessages(groupId, memberId1.toString(), 
FETCH_PARAMS, 1, BATCH_SIZE,
-            topicIdPartitions);
+        sharePartitionManager.fetchMessages(groupId, memberId1.toString(), 
FETCH_PARAMS, 1,
+            MAX_FETCH_RECORDS, BATCH_SIZE, topicIdPartitions);
         verify(sharePartitionManager, 
times(2)).processShareFetch(captor.capture());
         // Verify the partitions rotation, rotate by 1.
         resultShareFetch = captor.getValue();
         validateRotatedListEquals(topicIdPartitions, 
resultShareFetch.topicIdPartitions(), 1);
 
         // Rotation by 3, less that the number of partitions.
-        sharePartitionManager.fetchMessages(groupId, memberId1.toString(), 
FETCH_PARAMS, 3, BATCH_SIZE,
-            topicIdPartitions);
+        sharePartitionManager.fetchMessages(groupId, memberId1.toString(), 
FETCH_PARAMS, 3,
+            MAX_FETCH_RECORDS, BATCH_SIZE, topicIdPartitions);
         verify(sharePartitionManager, 
times(3)).processShareFetch(captor.capture());
         // Verify the partitions rotation, rotate by 3.
         resultShareFetch = captor.getValue();
         validateRotatedListEquals(topicIdPartitions, 
resultShareFetch.topicIdPartitions(), 3);
 
         // Rotation by 12, more than the number of partitions.
-        sharePartitionManager.fetchMessages(groupId, memberId1.toString(), 
FETCH_PARAMS, 12, BATCH_SIZE,
-            topicIdPartitions);
+        sharePartitionManager.fetchMessages(groupId, memberId1.toString(), 
FETCH_PARAMS, 12,
+            MAX_FETCH_RECORDS, BATCH_SIZE, topicIdPartitions);
         verify(sharePartitionManager, 
times(4)).processShareFetch(captor.capture());
         // Verify the partitions rotation, rotate by 5 (12 % 7).
         resultShareFetch = captor.getValue();
         validateRotatedListEquals(topicIdPartitions, 
resultShareFetch.topicIdPartitions(), 5);
         // Rotation by Integer.MAX_VALUE, boundary test.
-        sharePartitionManager.fetchMessages(groupId, memberId1.toString(), 
FETCH_PARAMS, Integer.MAX_VALUE, BATCH_SIZE,
-            topicIdPartitions);
+        sharePartitionManager.fetchMessages(groupId, memberId1.toString(), 
FETCH_PARAMS, Integer.MAX_VALUE,
+            MAX_FETCH_RECORDS, BATCH_SIZE, topicIdPartitions);
         verify(sharePartitionManager, 
times(5)).processShareFetch(captor.capture());
         // Verify the partitions rotation, rotate by 1 (2147483647 % 7).
         resultShareFetch = captor.getValue();
@@ -3123,7 +3123,6 @@ public class SharePartitionManagerTest {
                 timer,
                 MAX_DELIVERY_COUNT,
                 MAX_IN_FLIGHT_MESSAGES,
-                MAX_FETCH_RECORDS,
                 persister,
                 mock(GroupConfigManager.class),
                 shareGroupMetrics,
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 1abf4b6297b..463a46ac121 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -75,7 +75,7 @@ import 
org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource
 import org.apache.kafka.common.utils.{ImplicitLinkedHashCollection, 
ProducerIdAndEpoch, SecurityUtils, Utils}
 import 
org.apache.kafka.coordinator.group.GroupConfig.{CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG,
 CONSUMER_SESSION_TIMEOUT_MS_CONFIG, SHARE_AUTO_OFFSET_RESET_CONFIG, 
SHARE_HEARTBEAT_INTERVAL_MS_CONFIG, SHARE_RECORD_LOCK_DURATION_MS_CONFIG, 
SHARE_SESSION_TIMEOUT_MS_CONFIG, STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG, 
STREAMS_NUM_STANDBY_REPLICAS_CONFIG, STREAMS_SESSION_TIMEOUT_MS_CONFIG}
 import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig
-import org.apache.kafka.coordinator.group.{GroupConfig, GroupCoordinator, 
GroupCoordinatorConfig}
+import org.apache.kafka.coordinator.group.{GroupConfig, GroupConfigManager, 
GroupCoordinator, GroupCoordinatorConfig}
 import org.apache.kafka.coordinator.group.streams.StreamsGroupHeartbeatResult
 import org.apache.kafka.coordinator.share.{ShareCoordinator, 
ShareCoordinatorTestConfig}
 import org.apache.kafka.coordinator.transaction.TransactionLogConfig
@@ -144,6 +144,7 @@ class KafkaApisTest extends Logging {
   private val fetchManager: FetchManager = mock(classOf[FetchManager])
   private val sharePartitionManager: SharePartitionManager = 
mock(classOf[SharePartitionManager])
   private val clientMetricsManager: ClientMetricsManager = 
mock(classOf[ClientMetricsManager])
+  private val groupConfigManager: GroupConfigManager = 
mock(classOf[GroupConfigManager])
   private val brokerTopicStats = new BrokerTopicStats
   private val clusterId = "clusterId"
   private val time = new MockTime
@@ -209,7 +210,8 @@ class KafkaApisTest extends Logging {
       time = time,
       tokenManager = null,
       apiVersionManager = apiVersionManager,
-      clientMetricsManager = clientMetricsManager)
+      clientMetricsManager = clientMetricsManager,
+      groupConfigManager = groupConfigManager)
   }
 
   private def setupFeatures(featureVersions: Seq[FeatureVersion]): Unit = {
@@ -3825,7 +3827,7 @@ class KafkaApisTest extends Logging {
 
     val records = memoryRecords(10, 0)
 
-    when(sharePartitionManager.fetchMessages(any(), any(), any(), anyInt(), 
anyInt(), any())).thenReturn(
+    when(sharePartitionManager.fetchMessages(any(), any(), any(), anyInt(), 
anyInt(), anyInt(), any())).thenReturn(
       CompletableFuture.completedFuture(Map[TopicIdPartition, 
ShareFetchResponseData.PartitionData](
         new TopicIdPartition(topicId, new TopicPartition(topicName, 
partitionIndex)) ->
           new ShareFetchResponseData.PartitionData()
@@ -3894,7 +3896,7 @@ class KafkaApisTest extends Logging {
 
     val records = memoryRecords(10, 0)
 
-    when(sharePartitionManager.fetchMessages(any(), any(), any(), anyInt(), 
anyInt(), any())).thenReturn(
+    when(sharePartitionManager.fetchMessages(any(), any(), any(), anyInt(), 
anyInt(), anyInt(), any())).thenReturn(
       CompletableFuture.completedFuture(Map[TopicIdPartition, 
ShareFetchResponseData.PartitionData](
         new TopicIdPartition(topicId, new TopicPartition(topicName, 
partitionIndex)) ->
           new ShareFetchResponseData.PartitionData()
@@ -3997,7 +3999,7 @@ class KafkaApisTest extends Logging {
 
     val records = memoryRecords(10, 0)
 
-    when(sharePartitionManager.fetchMessages(any(), any(), any(), anyInt(), 
anyInt(), any())).thenReturn(
+    when(sharePartitionManager.fetchMessages(any(), any(), any(), anyInt(), 
anyInt(), anyInt(), any())).thenReturn(
       CompletableFuture.completedFuture(Map[TopicIdPartition, 
ShareFetchResponseData.PartitionData](
         new TopicIdPartition(topicId, new TopicPartition(topicName, 
partitionIndex)) ->
           new ShareFetchResponseData.PartitionData()
@@ -4098,7 +4100,7 @@ class KafkaApisTest extends Logging {
     addTopicToMetadataCache(topicName, 1, topicId = topicId)
     val memberId: Uuid = Uuid.ZERO_UUID
 
-    when(sharePartitionManager.fetchMessages(any(), any(), any(), anyInt(), 
anyInt(), any())).thenReturn(
+    when(sharePartitionManager.fetchMessages(any(), any(), any(), anyInt(), 
anyInt(), anyInt(), any())).thenReturn(
       FutureUtils.failedFuture[util.Map[TopicIdPartition, 
ShareFetchResponseData.PartitionData]](Errors.UNKNOWN_SERVER_ERROR.exception())
     )
 
@@ -4148,7 +4150,7 @@ class KafkaApisTest extends Logging {
 
     val records = memoryRecords(10, 0)
 
-    when(sharePartitionManager.fetchMessages(any(), any(), any(), anyInt(), 
anyInt(), any())).thenReturn(
+    when(sharePartitionManager.fetchMessages(any(), any(), any(), anyInt(), 
anyInt(), anyInt(), any())).thenReturn(
       CompletableFuture.completedFuture(Map[TopicIdPartition, 
ShareFetchResponseData.PartitionData](
         new TopicIdPartition(topicId, new TopicPartition(topicName, 
partitionIndex)) ->
           new ShareFetchResponseData.PartitionData()
@@ -4222,7 +4224,7 @@ class KafkaApisTest extends Logging {
 
     val groupId = "group"
 
-    when(sharePartitionManager.fetchMessages(any(), any(), any(), anyInt(), 
anyInt(), any())).thenReturn(
+    when(sharePartitionManager.fetchMessages(any(), any(), any(), anyInt(), 
anyInt(), anyInt(), any())).thenReturn(
       FutureUtils.failedFuture[util.Map[TopicIdPartition, 
ShareFetchResponseData.PartitionData]](Errors.UNKNOWN_SERVER_ERROR.exception())
     )
 
@@ -4285,7 +4287,7 @@ class KafkaApisTest extends Logging {
 
     val records = MemoryRecords.EMPTY
 
-    when(sharePartitionManager.fetchMessages(any(), any(), any(), anyInt(), 
anyInt(), any())).thenReturn(
+    when(sharePartitionManager.fetchMessages(any(), any(), any(), anyInt(), 
anyInt(), anyInt(), any())).thenReturn(
       CompletableFuture.completedFuture(Map[TopicIdPartition, 
ShareFetchResponseData.PartitionData](
         new TopicIdPartition(topicId, new TopicPartition(topicName, 
partitionIndex)) ->
           new ShareFetchResponseData.PartitionData()
@@ -4348,7 +4350,7 @@ class KafkaApisTest extends Logging {
     val groupId = "group"
     val records = memoryRecords(10, 0)
 
-    when(sharePartitionManager.fetchMessages(any(), any(), any(), anyInt(), 
anyInt(), any())).thenReturn(
+    when(sharePartitionManager.fetchMessages(any(), any(), any(), anyInt(), 
anyInt(), anyInt(), any())).thenReturn(
       CompletableFuture.completedFuture(Map[TopicIdPartition, 
ShareFetchResponseData.PartitionData](
         new TopicIdPartition(topicId, partitionIndex, topicName) ->
           new ShareFetchResponseData.PartitionData()
@@ -4438,7 +4440,7 @@ class KafkaApisTest extends Logging {
     val groupId = "group"
     val records = memoryRecords(10, 0)
 
-    when(sharePartitionManager.fetchMessages(any(), any(), any(), anyInt(), 
anyInt(), any())).thenReturn(
+    when(sharePartitionManager.fetchMessages(any(), any(), any(), anyInt(), 
anyInt(), anyInt(), any())).thenReturn(
       CompletableFuture.completedFuture(Map[TopicIdPartition, 
ShareFetchResponseData.PartitionData](
         new TopicIdPartition(topicId, new TopicPartition(topicName, 
partitionIndex)) ->
           new ShareFetchResponseData.PartitionData()
@@ -4528,7 +4530,7 @@ class KafkaApisTest extends Logging {
     val records2 = memoryRecords(10, 10)
     val records3 = memoryRecords(10, 20)
 
-    when(sharePartitionManager.fetchMessages(any(), any(), any(), anyInt(), 
anyInt(), any())).thenReturn(
+    when(sharePartitionManager.fetchMessages(any(), any(), any(), anyInt(), 
anyInt(), anyInt(), any())).thenReturn(
       CompletableFuture.completedFuture(Map[TopicIdPartition, 
ShareFetchResponseData.PartitionData](
         new TopicIdPartition(topicId, new TopicPartition(topicName, 
partitionIndex)) ->
           new ShareFetchResponseData.PartitionData()
@@ -4752,7 +4754,7 @@ class KafkaApisTest extends Logging {
 
     val groupId = "group"
 
-    when(sharePartitionManager.fetchMessages(any(), any(), any(), anyInt(), 
anyInt(), any())).thenReturn(
+    when(sharePartitionManager.fetchMessages(any(), any(), any(), anyInt(), 
anyInt(), anyInt(), any())).thenReturn(
       CompletableFuture.completedFuture(Map[TopicIdPartition, 
ShareFetchResponseData.PartitionData](
         new TopicIdPartition(topicId1, new TopicPartition(topicName1, 0)) ->
           new ShareFetchResponseData.PartitionData()
@@ -5211,7 +5213,7 @@ class KafkaApisTest extends Logging {
     val tp2 = new TopicIdPartition(topicId2, new TopicPartition(topicName2, 0))
     val tp3 = new TopicIdPartition(topicId2, new TopicPartition(topicName2, 1))
 
-    when(sharePartitionManager.fetchMessages(any(), any(), any(), anyInt(), 
anyInt(), any())).thenReturn(
+    when(sharePartitionManager.fetchMessages(any(), any(), any(), anyInt(), 
anyInt(), anyInt(), any())).thenReturn(
       CompletableFuture.completedFuture(Map[TopicIdPartition, 
ShareFetchResponseData.PartitionData](
         tp1 ->
           new ShareFetchResponseData.PartitionData()
@@ -5361,7 +5363,7 @@ class KafkaApisTest extends Logging {
     val tp2 = new TopicIdPartition(topicId1, new TopicPartition(topicName1, 1))
     val tp3 = new TopicIdPartition(topicId2, new TopicPartition(topicName2, 0))
 
-    when(sharePartitionManager.fetchMessages(any(), any(), any(), anyInt(), 
anyInt(), any())).thenReturn(
+    when(sharePartitionManager.fetchMessages(any(), any(), any(), anyInt(), 
anyInt(), anyInt(), any())).thenReturn(
       CompletableFuture.completedFuture(Map[TopicIdPartition, 
ShareFetchResponseData.PartitionData](
         tp1 ->
           new ShareFetchResponseData.PartitionData()
@@ -5495,7 +5497,7 @@ class KafkaApisTest extends Logging {
     val tp2 = new TopicIdPartition(topicId2, new TopicPartition(topicName2, 0))
     val tp3 = new TopicIdPartition(topicId2, new TopicPartition(topicName2, 1))
 
-    when(sharePartitionManager.fetchMessages(any(), any(), any(), anyInt(), 
anyInt(), any())).thenReturn(
+    when(sharePartitionManager.fetchMessages(any(), any(), any(), anyInt(), 
anyInt(), anyInt(), any())).thenReturn(
       CompletableFuture.completedFuture(Map[TopicIdPartition, 
ShareFetchResponseData.PartitionData](
         tp1 ->
           new ShareFetchResponseData.PartitionData()
@@ -5635,7 +5637,7 @@ class KafkaApisTest extends Logging {
     val tp3 = new TopicIdPartition(topicId2, new TopicPartition(topicName2, 1))
     val tp4 = new TopicIdPartition(topicId3, new TopicPartition(topicName3, 0))
 
-    when(sharePartitionManager.fetchMessages(any(), any(), any(), anyInt(), 
anyInt(), any())).thenReturn(
+    when(sharePartitionManager.fetchMessages(any(), any(), any(), anyInt(), 
anyInt(), anyInt(), any())).thenReturn(
       CompletableFuture.completedFuture(Map[TopicIdPartition, 
ShareFetchResponseData.PartitionData](
         tp2 ->
           new ShareFetchResponseData.PartitionData()
@@ -5808,7 +5810,7 @@ class KafkaApisTest extends Logging {
 
     val groupId = "group"
 
-    when(sharePartitionManager.fetchMessages(any(), any(), any(), anyInt(), 
anyInt(), any())).thenReturn(
+    when(sharePartitionManager.fetchMessages(any(), any(), any(), anyInt(), 
anyInt(), anyInt(), any())).thenReturn(
       CompletableFuture.completedFuture(Map[TopicIdPartition, 
ShareFetchResponseData.PartitionData](
         new TopicIdPartition(topicId, new TopicPartition(topicName, 
partitionIndex)) ->
           new ShareFetchResponseData.PartitionData()
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index 88cbe22769f..1f6c3755131 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -1031,7 +1031,6 @@ class KafkaConfigTest {
         case GroupCoordinatorConfig.SHARE_GROUP_MAX_SIZE_CONFIG => 
assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
         case 
ShareGroupConfig.SHARE_FETCH_PURGATORY_PURGE_INTERVAL_REQUESTS_CONFIG => 
assertPropertyInvalid(baseProperties, name, "not_a_number")
         case ShareGroupConfig.SHARE_GROUP_PERSISTER_CLASS_NAME_CONFIG =>  
//ignore string
-        case ShareGroupConfig.SHARE_FETCH_MAX_FETCH_RECORDS_CONFIG => 
assertPropertyInvalid(baseProperties, name, "not_a_number")
 
         /** Streams groups configs */
         case GroupCoordinatorConfig.STREAMS_GROUP_SESSION_TIMEOUT_MS_CONFIG => 
assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
diff --git 
a/core/src/test/scala/unit/kafka/server/ShareFetchAcknowledgeRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/ShareFetchAcknowledgeRequestTest.scala
index 68fdb40fb22..9fc3165bd7b 100644
--- 
a/core/src/test/scala/unit/kafka/server/ShareFetchAcknowledgeRequestTest.scala
+++ 
b/core/src/test/scala/unit/kafka/server/ShareFetchAcknowledgeRequestTest.scala
@@ -60,7 +60,8 @@ class ShareFetchAcknowledgeRequestTest(cluster: 
ClusterInstance) extends GroupCo
     val shareFetchRequest = createShareFetchRequest(groupId, metadata, send, 
Seq.empty, Map.empty)
     val shareFetchResponse = 
connectAndReceive[ShareFetchResponse](shareFetchRequest)
 
-    assertEquals(Errors.UNSUPPORTED_VERSION.code(), 
shareFetchResponse.data().errorCode())
+    assertEquals(Errors.UNSUPPORTED_VERSION.code, 
shareFetchResponse.data.errorCode)
+    assertEquals(0, shareFetchResponse.data.acquisitionLockTimeoutMs)
   }
 
   @ClusterTest(
@@ -75,7 +76,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: 
ClusterInstance) extends GroupCo
     val shareAcknowledgeRequest = createShareAcknowledgeRequest(groupId, 
metadata, Map.empty)
     val shareAcknowledgeResponse = 
connectAndReceive[ShareAcknowledgeResponse](shareAcknowledgeRequest)
 
-    assertEquals(Errors.UNSUPPORTED_VERSION.code(), 
shareAcknowledgeResponse.data().errorCode())
+    assertEquals(Errors.UNSUPPORTED_VERSION.code, 
shareAcknowledgeResponse.data.errorCode)
   }
 
   @ClusterTests(
@@ -127,6 +128,8 @@ class ShareFetchAcknowledgeRequestTest(cluster: 
ClusterInstance) extends GroupCo
     // Send the share fetch request to the non-replica and verify the error 
code
     val shareFetchRequest = createShareFetchRequest(groupId, metadata, send, 
Seq.empty, Map.empty)
     val shareFetchResponse = 
connectAndReceive[ShareFetchResponse](shareFetchRequest, nonReplicaId)
+    assertEquals(30000, shareFetchResponse.data.acquisitionLockTimeoutMs)
+
     val partitionData = 
shareFetchResponse.responseData(topicNames).get(topicIdPartition)
     assertEquals(Errors.NOT_LEADER_OR_FOLLOWER.code, partitionData.errorCode)
     assertEquals(leader, partitionData.currentLeader().leaderId())
@@ -184,6 +187,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: 
ClusterInstance) extends GroupCo
 
     val shareFetchResponseData = shareFetchResponse.data()
     assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
+    assertEquals(30000, shareFetchResponseData.acquisitionLockTimeoutMs)
     assertEquals(1, shareFetchResponseData.responses().size())
     assertEquals(topicId, shareFetchResponseData.responses().get(0).topicId())
     assertEquals(1, 
shareFetchResponseData.responses().get(0).partitions().size())
@@ -258,6 +262,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: 
ClusterInstance) extends GroupCo
       val shareFetchResponse = 
connectAndReceive[ShareFetchResponse](shareFetchRequest)
       val shareFetchResponseData = shareFetchResponse.data()
       assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
+      assertEquals(30000, shareFetchResponseData.acquisitionLockTimeoutMs)
       assertEquals(1, shareFetchResponseData.responses().size())
       val partitionsCount = 
shareFetchResponseData.responses().get(0).partitions().size()
       if (partitionsCount > 0) {
@@ -376,6 +381,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: 
ClusterInstance) extends GroupCo
 
     val shareFetchResponseData1 = shareFetchResponse1.data()
     assertEquals(Errors.NONE.code, shareFetchResponseData1.errorCode)
+    assertEquals(30000, shareFetchResponseData1.acquisitionLockTimeoutMs)
     assertEquals(1, shareFetchResponseData1.responses().size())
     assertEquals(topicId, shareFetchResponseData1.responses().get(0).topicId())
     assertEquals(1, 
shareFetchResponseData1.responses().get(0).partitions().size())
@@ -383,6 +389,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: 
ClusterInstance) extends GroupCo
 
     val shareFetchResponseData2 = shareFetchResponse2.data()
     assertEquals(Errors.NONE.code, shareFetchResponseData2.errorCode)
+    assertEquals(30000, shareFetchResponseData2.acquisitionLockTimeoutMs)
     assertEquals(1, shareFetchResponseData2.responses().size())
     assertEquals(topicId, shareFetchResponseData2.responses().get(0).topicId())
     assertEquals(1, 
shareFetchResponseData2.responses().get(0).partitions().size())
@@ -472,6 +479,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: 
ClusterInstance) extends GroupCo
 
     var shareFetchResponseData = shareFetchResponse.data()
     assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
+    assertEquals(30000, shareFetchResponseData.acquisitionLockTimeoutMs)
     assertEquals(1, shareFetchResponseData.responses().size())
     assertEquals(topicId, shareFetchResponseData.responses().get(0).topicId())
     assertEquals(1, 
shareFetchResponseData.responses().get(0).partitions().size())
@@ -520,6 +528,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: 
ClusterInstance) extends GroupCo
 
     shareFetchResponseData = shareFetchResponse.data()
     assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
+    assertEquals(30000, shareFetchResponseData.acquisitionLockTimeoutMs)
     assertEquals(1, shareFetchResponseData.responses().size())
     assertEquals(topicId, shareFetchResponseData.responses().get(0).topicId())
     assertEquals(1, 
shareFetchResponseData.responses().get(0).partitions().size())
@@ -541,7 +550,8 @@ class ShareFetchAcknowledgeRequestTest(cluster: 
ClusterInstance) extends GroupCo
           new ClusterConfigProperty(key = 
"group.coordinator.rebalance.protocols", value = "classic,consumer,share"),
           new ClusterConfigProperty(key = "group.share.enable", value = 
"true"),
           new ClusterConfigProperty(key = "offsets.topic.num.partitions", 
value = "1"),
-          new ClusterConfigProperty(key = "offsets.topic.replication.factor", 
value = "1")
+          new ClusterConfigProperty(key = "offsets.topic.replication.factor", 
value = "1"),
+          new ClusterConfigProperty(key = 
"group.share.record.lock.duration.ms", value = "15000")
         )
       ),
       new ClusterTest(
@@ -553,7 +563,8 @@ class ShareFetchAcknowledgeRequestTest(cluster: 
ClusterInstance) extends GroupCo
           new ClusterConfigProperty(key = "group.share.persister.class.name", 
value = "org.apache.kafka.server.share.persister.DefaultStatePersister"),
           new ClusterConfigProperty(key = 
"share.coordinator.state.topic.replication.factor", value = "1"),
           new ClusterConfigProperty(key = 
"share.coordinator.state.topic.num.partitions", value = "1"),
-          new ClusterConfigProperty(key = "unstable.api.versions.enable", 
value = "true")
+          new ClusterConfigProperty(key = "unstable.api.versions.enable", 
value = "true"),
+          new ClusterConfigProperty(key = 
"group.share.record.lock.duration.ms", value = "15000")
         )
       ),
     )
@@ -573,7 +584,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: 
ClusterInstance) extends GroupCo
     val send: Seq[TopicIdPartition] = Seq(topicIdPartition)
 
     // Send the first share fetch request to initialize the share partition
-    sendFirstShareFetchRequest(memberId, groupId, send)
+    sendFirstShareFetchRequest(memberId, groupId, send, 15000)
 
     initProducer()
     // Producing 10 records to the topic created above
@@ -588,6 +599,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: 
ClusterInstance) extends GroupCo
 
     var shareFetchResponseData = shareFetchResponse.data()
     assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
+    assertEquals(15000, shareFetchResponseData.acquisitionLockTimeoutMs)
     assertEquals(1, shareFetchResponseData.responses().size())
     assertEquals(topicId, shareFetchResponseData.responses().get(0).topicId())
     assertEquals(1, 
shareFetchResponseData.responses().get(0).partitions().size())
@@ -616,6 +628,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: 
ClusterInstance) extends GroupCo
 
     shareFetchResponseData = shareFetchResponse.data()
     assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
+    assertEquals(15000, shareFetchResponseData.acquisitionLockTimeoutMs)
     assertEquals(1, shareFetchResponseData.responses().size())
     assertEquals(topicId, shareFetchResponseData.responses().get(0).topicId())
     assertEquals(1, 
shareFetchResponseData.responses().get(0).partitions().size())
@@ -640,6 +653,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: 
ClusterInstance) extends GroupCo
 
     shareFetchResponseData = shareFetchResponse.data()
     assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
+    assertEquals(15000, shareFetchResponseData.acquisitionLockTimeoutMs)
     assertEquals(1, shareFetchResponseData.responses().size())
     assertEquals(topicId, shareFetchResponseData.responses().get(0).topicId())
     assertEquals(1, 
shareFetchResponseData.responses().get(0).partitions().size())
@@ -708,6 +722,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: 
ClusterInstance) extends GroupCo
 
     var shareFetchResponseData = shareFetchResponse.data()
     assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
+    assertEquals(30000, shareFetchResponseData.acquisitionLockTimeoutMs)
     assertEquals(1, shareFetchResponseData.responses().size())
     assertEquals(topicId, shareFetchResponseData.responses().get(0).topicId())
     assertEquals(1, 
shareFetchResponseData.responses().get(0).partitions().size())
@@ -753,6 +768,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: 
ClusterInstance) extends GroupCo
 
     shareFetchResponseData = shareFetchResponse.data()
     assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
+    assertEquals(30000, shareFetchResponseData.acquisitionLockTimeoutMs)
     assertEquals(1, shareFetchResponseData.responses().size())
     assertEquals(topicId, shareFetchResponseData.responses().get(0).topicId())
     assertEquals(1, 
shareFetchResponseData.responses().get(0).partitions().size())
@@ -821,6 +837,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: 
ClusterInstance) extends GroupCo
 
     var shareFetchResponseData = shareFetchResponse.data()
     assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
+    assertEquals(30000, shareFetchResponseData.acquisitionLockTimeoutMs)
     assertEquals(1, shareFetchResponseData.responses().size())
     assertEquals(topicId, shareFetchResponseData.responses().get(0).topicId())
     assertEquals(1, 
shareFetchResponseData.responses().get(0).partitions().size())
@@ -865,6 +882,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: 
ClusterInstance) extends GroupCo
 
       shareFetchResponseData = shareFetchResponse.data()
       assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
+      assertEquals(30000, shareFetchResponseData.acquisitionLockTimeoutMs)
       assertEquals(1, shareFetchResponseData.responses().size())
       assertEquals(topicId, 
shareFetchResponseData.responses().get(0).topicId())
       val responseSize = 
shareFetchResponseData.responses().get(0).partitions().size()
@@ -938,6 +956,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: 
ClusterInstance) extends GroupCo
 
     var shareFetchResponseData = shareFetchResponse.data()
     assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
+    assertEquals(30000, shareFetchResponseData.acquisitionLockTimeoutMs)
     assertEquals(1, shareFetchResponseData.responses().size())
     assertEquals(topicId, shareFetchResponseData.responses().get(0).topicId())
     assertEquals(1, 
shareFetchResponseData.responses().get(0).partitions().size())
@@ -986,6 +1005,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: 
ClusterInstance) extends GroupCo
 
     shareFetchResponseData = shareFetchResponse.data()
     assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
+    assertEquals(30000, shareFetchResponseData.acquisitionLockTimeoutMs)
     assertEquals(1, shareFetchResponseData.responses().size())
     assertEquals(topicId, shareFetchResponseData.responses().get(0).topicId())
     assertEquals(1, 
shareFetchResponseData.responses().get(0).partitions().size())
@@ -1054,6 +1074,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: 
ClusterInstance) extends GroupCo
 
     var shareFetchResponseData = shareFetchResponse.data()
     assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
+    assertEquals(30000, shareFetchResponseData.acquisitionLockTimeoutMs)
     assertEquals(1, shareFetchResponseData.responses().size())
     assertEquals(topicId, shareFetchResponseData.responses().get(0).topicId())
     assertEquals(1, 
shareFetchResponseData.responses().get(0).partitions().size())
@@ -1082,6 +1103,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: 
ClusterInstance) extends GroupCo
 
     shareFetchResponseData = shareFetchResponse.data()
     assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
+    assertEquals(30000, shareFetchResponseData.acquisitionLockTimeoutMs)
     assertEquals(1, shareFetchResponseData.responses().size())
     assertEquals(topicId, shareFetchResponseData.responses().get(0).topicId())
     assertEquals(1, 
shareFetchResponseData.responses().get(0).partitions().size())
@@ -1106,6 +1128,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: 
ClusterInstance) extends GroupCo
 
     shareFetchResponseData = shareFetchResponse.data()
     assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
+    assertEquals(30000, shareFetchResponseData.acquisitionLockTimeoutMs)
     assertEquals(1, shareFetchResponseData.responses().size())
     assertEquals(topicId, shareFetchResponseData.responses().get(0).topicId())
     assertEquals(1, 
shareFetchResponseData.responses().get(0).partitions().size())
@@ -1176,6 +1199,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: 
ClusterInstance) extends GroupCo
 
     var shareFetchResponseData = shareFetchResponse.data()
     assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
+    assertEquals(30000, shareFetchResponseData.acquisitionLockTimeoutMs)
     assertEquals(1, shareFetchResponseData.responses().size())
     assertEquals(topicId, shareFetchResponseData.responses().get(0).topicId())
     assertEquals(1, 
shareFetchResponseData.responses().get(0).partitions().size())
@@ -1221,6 +1245,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: 
ClusterInstance) extends GroupCo
 
     shareFetchResponseData = shareFetchResponse.data()
     assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
+    assertEquals(30000, shareFetchResponseData.acquisitionLockTimeoutMs)
     assertEquals(1, shareFetchResponseData.responses().size())
     assertEquals(topicId, shareFetchResponseData.responses().get(0).topicId())
     assertEquals(1, 
shareFetchResponseData.responses().get(0).partitions().size())
@@ -1268,6 +1293,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: 
ClusterInstance) extends GroupCo
 
     shareFetchResponseData = shareFetchResponse.data()
     assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
+    assertEquals(30000, shareFetchResponseData.acquisitionLockTimeoutMs)
     assertEquals(1, shareFetchResponseData.responses().size())
     assertEquals(topicId, shareFetchResponseData.responses().get(0).topicId())
     assertEquals(1, 
shareFetchResponseData.responses().get(0).partitions().size())
@@ -1534,6 +1560,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: 
ClusterInstance) extends GroupCo
 
     var shareFetchResponseData = shareFetchResponse.data()
     assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
+    assertEquals(30000, shareFetchResponseData.acquisitionLockTimeoutMs)
     assertEquals(1, shareFetchResponseData.responses().size())
     assertEquals(topicId, shareFetchResponseData.responses().get(0).topicId())
     assertEquals(1, 
shareFetchResponseData.responses().get(0).partitions().size())
@@ -1562,6 +1589,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: 
ClusterInstance) extends GroupCo
 
     shareFetchResponseData = shareFetchResponse.data()
     assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
+    assertEquals(30000, shareFetchResponseData.acquisitionLockTimeoutMs)
     assertEquals(1, shareFetchResponseData.responses().size())
     assertEquals(topicId, shareFetchResponseData.responses().get(0).topicId())
     assertEquals(1, 
shareFetchResponseData.responses().get(0).partitions().size())
@@ -1587,6 +1615,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: 
ClusterInstance) extends GroupCo
 
     shareFetchResponseData = shareFetchResponse.data()
     assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
+    assertEquals(30000, shareFetchResponseData.acquisitionLockTimeoutMs)
     assertEquals(0, shareFetchResponseData.responses().size()) // responses 
list will be empty because there are no responses for the final fetch request
   }
 
@@ -1644,6 +1673,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: 
ClusterInstance) extends GroupCo
 
     var shareFetchResponseData = shareFetchResponse.data()
     assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
+    assertEquals(30000, shareFetchResponseData.acquisitionLockTimeoutMs)
     assertEquals(1, shareFetchResponseData.responses().size())
     assertEquals(topicId, shareFetchResponseData.responses().get(0).topicId())
     assertEquals(1, 
shareFetchResponseData.responses().get(0).partitions().size())
@@ -1672,6 +1702,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: 
ClusterInstance) extends GroupCo
 
     shareFetchResponseData = shareFetchResponse.data()
     assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
+    assertEquals(30000, shareFetchResponseData.acquisitionLockTimeoutMs)
     assertEquals(1, shareFetchResponseData.responses().size())
     assertEquals(topicId, shareFetchResponseData.responses().get(0).topicId())
     assertEquals(1, 
shareFetchResponseData.responses().get(0).partitions().size())
@@ -1764,6 +1795,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: 
ClusterInstance) extends GroupCo
     val shareFetchResponseData = shareFetchResponse.data()
     // The response will have a top level error code because this is an 
Initial Fetch request with acknowledgement data present
     assertEquals(Errors.INVALID_REQUEST.code(), 
shareFetchResponseData.errorCode)
+    assertEquals(0, shareFetchResponse.data.acquisitionLockTimeoutMs)
   }
 
   @ClusterTests(
@@ -1870,6 +1902,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: 
ClusterInstance) extends GroupCo
 
     var shareFetchResponseData = shareFetchResponse.data()
     assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
+    assertEquals(30000, shareFetchResponseData.acquisitionLockTimeoutMs)
     assertEquals(1, shareFetchResponseData.responses().size())
     assertEquals(topicId, shareFetchResponseData.responses().get(0).topicId())
     assertEquals(1, 
shareFetchResponseData.responses().get(0).partitions().size())
@@ -1946,6 +1979,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: 
ClusterInstance) extends GroupCo
 
     val shareFetchResponseData = shareFetchResponse.data()
     assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
+    assertEquals(30000, shareFetchResponseData.acquisitionLockTimeoutMs)
     assertEquals(1, shareFetchResponseData.responses().size())
     assertEquals(topicId, shareFetchResponseData.responses().get(0).topicId())
     assertEquals(1, 
shareFetchResponseData.responses().get(0).partitions().size())
@@ -2028,6 +2062,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: 
ClusterInstance) extends GroupCo
 
     var shareFetchResponseData = shareFetchResponse.data()
     assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
+    assertEquals(30000, shareFetchResponseData.acquisitionLockTimeoutMs)
     assertEquals(1, shareFetchResponseData.responses().size())
     assertEquals(topicId, shareFetchResponseData.responses().get(0).topicId())
     assertEquals(1, 
shareFetchResponseData.responses().get(0).partitions().size())
@@ -2105,6 +2140,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: 
ClusterInstance) extends GroupCo
 
     val shareFetchResponseData = shareFetchResponse.data()
     assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
+    assertEquals(30000, shareFetchResponseData.acquisitionLockTimeoutMs)
     assertEquals(1, shareFetchResponseData.responses().size())
     assertEquals(topicId, shareFetchResponseData.responses().get(0).topicId())
     assertEquals(1, 
shareFetchResponseData.responses().get(0).partitions().size())
@@ -2194,6 +2230,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: 
ClusterInstance) extends GroupCo
       val shareFetchResponse = 
connectAndReceive[ShareFetchResponse](shareFetchRequest)
       val shareFetchResponseData = shareFetchResponse.data()
       assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
+      assertEquals(30000, shareFetchResponseData.acquisitionLockTimeoutMs)
       assertEquals(1, shareFetchResponseData.responses().size())
       val partitionsCount = 
shareFetchResponseData.responses().get(0).partitions().size()
       if (partitionsCount > 0) {
@@ -2220,6 +2257,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: 
ClusterInstance) extends GroupCo
 
     val shareFetchResponseData = shareFetchResponse.data()
     assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
+    assertEquals(30000, shareFetchResponseData.acquisitionLockTimeoutMs)
     assertEquals(1, shareFetchResponseData.responses().size())
     assertEquals(topicId, shareFetchResponseData.responses().get(0).topicId())
     assertEquals(1, 
shareFetchResponseData.responses().get(0).partitions().size())
@@ -2234,9 +2272,143 @@ class ShareFetchAcknowledgeRequestTest(cluster: 
ClusterInstance) extends GroupCo
     compareFetchResponsePartitions(expectedPartitionData, partitionData)
   }
 
+  @ClusterTests(
+    Array(
+      new ClusterTest(
+        serverProperties = Array(
+          new ClusterConfigProperty(key = 
"group.coordinator.rebalance.protocols", value = "classic,consumer,share"),
+          new ClusterConfigProperty(key = "group.share.enable", value = 
"true"),
+          new ClusterConfigProperty(key = "offsets.topic.num.partitions", 
value = "1"),
+          new ClusterConfigProperty(key = "offsets.topic.replication.factor", 
value = "1")
+        )
+      ),
+      new ClusterTest(
+        serverProperties = Array(
+          new ClusterConfigProperty(key = 
"group.coordinator.rebalance.protocols", value = "classic,consumer,share"),
+          new ClusterConfigProperty(key = "group.share.enable", value = 
"true"),
+          new ClusterConfigProperty(key = "offsets.topic.num.partitions", 
value = "1"),
+          new ClusterConfigProperty(key = "offsets.topic.replication.factor", 
value = "1"),
+          new ClusterConfigProperty(key = "group.share.persister.class.name", 
value = "org.apache.kafka.server.share.persister.DefaultStatePersister"),
+          new ClusterConfigProperty(key = 
"share.coordinator.state.topic.replication.factor", value = "1"),
+          new ClusterConfigProperty(key = 
"share.coordinator.state.topic.num.partitions", value = "1"),
+          new ClusterConfigProperty(key = "unstable.api.versions.enable", 
value = "true")
+        )
+      )
+    )
+  )
+  def testShareFetchRequestWithMaxRecordsAndBatchSize(): Unit = {
+    val groupId: String = "group"
+    val memberId = Uuid.randomUuid
+    val topic = "topic"
+    val partition = 0
+
+    createTopicAndReturnLeaders(topic, numPartitions = 3)
+    val topicIds = getTopicIds.asJava
+    val topicId = topicIds.get(topic)
+    val topicIdPartition = new TopicIdPartition(topicId, new 
TopicPartition(topic, partition))
+
+    val send: Seq[TopicIdPartition] = Seq(topicIdPartition)
+
+    // Send the first share fetch request to initialize the share partition
+    sendFirstShareFetchRequest(memberId, groupId, send)
+
+    initProducer()
+    // Producing 10 records to the topic created above
+    produceData(topicIdPartition, 10)
+
+    // Send the second share fetch request to fetch the records produced above
+    val metadata = new ShareRequestMetadata(memberId, 
ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH))
+    val acknowledgementsMap: Map[TopicIdPartition, 
util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty
+    val shareFetchRequest = createShareFetchRequest(groupId, metadata, send, 
Seq.empty, acknowledgementsMap, maxRecords = 1, batchSize = 1)
+    val shareFetchResponse = 
connectAndReceive[ShareFetchResponse](shareFetchRequest)
+
+    val shareFetchResponseData = shareFetchResponse.data
+    assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
+    assertEquals(30000, shareFetchResponseData.acquisitionLockTimeoutMs)
+    assertEquals(1, shareFetchResponseData.responses.size)
+    assertEquals(topicId, shareFetchResponseData.responses.get(0).topicId)
+    assertEquals(1, shareFetchResponseData.responses.get(0).partitions.size)
+
+    val expectedPartitionData = new ShareFetchResponseData.PartitionData()
+      .setPartitionIndex(partition)
+      .setErrorCode(Errors.NONE.code)
+      .setAcknowledgeErrorCode(Errors.NONE.code)
+      .setAcquiredRecords(expectedAcquiredRecords(util.List.of(0), 
util.List.of(0), util.List.of(1)))
+
+    val partitionData = 
shareFetchResponseData.responses.get(0).partitions.get(0)
+    compareFetchResponsePartitions(expectedPartitionData, partitionData)
+  }
+
+  @ClusterTests(
+    Array(
+      new ClusterTest(
+        serverProperties = Array(
+          new ClusterConfigProperty(key = 
"group.coordinator.rebalance.protocols", value = "classic,consumer,share"),
+          new ClusterConfigProperty(key = "group.share.enable", value = 
"true"),
+          new ClusterConfigProperty(key = "offsets.topic.num.partitions", 
value = "1"),
+          new ClusterConfigProperty(key = "offsets.topic.replication.factor", 
value = "1")
+        )
+      ),
+      new ClusterTest(
+        serverProperties = Array(
+          new ClusterConfigProperty(key = 
"group.coordinator.rebalance.protocols", value = "classic,consumer,share"),
+          new ClusterConfigProperty(key = "group.share.enable", value = 
"true"),
+          new ClusterConfigProperty(key = "offsets.topic.num.partitions", 
value = "1"),
+          new ClusterConfigProperty(key = "offsets.topic.replication.factor", 
value = "1"),
+          new ClusterConfigProperty(key = "group.share.persister.class.name", 
value = "org.apache.kafka.server.share.persister.DefaultStatePersister"),
+          new ClusterConfigProperty(key = 
"share.coordinator.state.topic.replication.factor", value = "1"),
+          new ClusterConfigProperty(key = 
"share.coordinator.state.topic.num.partitions", value = "1"),
+          new ClusterConfigProperty(key = "unstable.api.versions.enable", 
value = "true")
+        )
+      )
+    )
+  )
+  def testShareFetchRequestMultipleBatchesWithMaxRecordsAndBatchSize(): Unit = 
{
+    val groupId: String = "group"
+    val memberId = Uuid.randomUuid
+    val topic = "topic"
+    val partition = 0
+
+    createTopicAndReturnLeaders(topic, numPartitions = 3)
+    val topicIds = getTopicIds.asJava
+    val topicId = topicIds.get(topic)
+    val topicIdPartition = new TopicIdPartition(topicId, new 
TopicPartition(topic, partition))
+
+    val send: Seq[TopicIdPartition] = Seq(topicIdPartition)
+
+    // Send the first share fetch request to initialize the share partition
+    sendFirstShareFetchRequest(memberId, groupId, send)
+
+    initProducer()
+    // Producing 10 records to the topic created above
+    produceData(topicIdPartition, 10)
+
+    // Send the second share fetch request to fetch the records produced above
+    val metadata = new ShareRequestMetadata(memberId, 
ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH))
+    val acknowledgementsMap: Map[TopicIdPartition, 
util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty
+    val shareFetchRequest = createShareFetchRequest(groupId, metadata, send, 
Seq.empty, acknowledgementsMap, maxRecords = 5, batchSize = 1)
+    val shareFetchResponse = 
connectAndReceive[ShareFetchResponse](shareFetchRequest)
+
+    val shareFetchResponseData = shareFetchResponse.data
+    assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
+    assertEquals(30000, shareFetchResponseData.acquisitionLockTimeoutMs)
+    assertEquals(1, shareFetchResponseData.responses.size)
+    assertEquals(topicId, shareFetchResponseData.responses.get(0).topicId)
+    assertEquals(1, shareFetchResponseData.responses.get(0).partitions.size)
+
+    val expectedPartitionData = new ShareFetchResponseData.PartitionData()
+      .setPartitionIndex(partition)
+      .setErrorCode(Errors.NONE.code)
+      .setAcknowledgeErrorCode(Errors.NONE.code)
+      .setAcquiredRecords(expectedAcquiredRecords(util.List.of(0, 1, 2, 3, 4), 
util.List.of(0, 1, 2, 3, 4), util.List.of(1, 1, 1, 1, 1)))
+
+    val partitionData = 
shareFetchResponseData.responses.get(0).partitions.get(0)
+    compareFetchResponsePartitions(expectedPartitionData, partitionData)
+  }
+
   // For initial fetch request, the response may not be available in the first 
attempt when the share
   // partition is not initialized yet. Hence, wait for response from all 
partitions before proceeding.
-  private def sendFirstShareFetchRequest(memberId: Uuid, groupId: String, 
topicIdPartitions: Seq[TopicIdPartition]): Unit = {
+  private def sendFirstShareFetchRequest(memberId: Uuid, groupId: String, 
topicIdPartitions: Seq[TopicIdPartition], lockTimeout: Int = 30000): Unit = {
     val partitions: util.Set[Integer] = new util.HashSet()
     TestUtils.waitUntilTrue(() => {
       val metadata = new ShareRequestMetadata(memberId, 
ShareRequestMetadata.INITIAL_EPOCH)
@@ -2245,6 +2417,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: 
ClusterInstance) extends GroupCo
       val shareFetchResponseData = shareFetchResponse.data()
 
       assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
+      assertEquals(lockTimeout, 
shareFetchResponseData.acquisitionLockTimeoutMs)
       shareFetchResponseData.responses().asScala.foreach(response => {
         if (!response.partitions().isEmpty) {
           response.partitions().forEach(partitionData => 
partitions.add(partitionData.partitionIndex))
@@ -2270,7 +2443,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: 
ClusterInstance) extends GroupCo
                                              actualResponse: 
ShareFetchResponseData.PartitionData): Unit = {
     assertEquals(expectedResponse.partitionIndex, 
actualResponse.partitionIndex)
     assertEquals(expectedResponse.errorCode, actualResponse.errorCode)
-    assertEquals(expectedResponse.errorCode, actualResponse.errorCode)
+    assertEquals(expectedResponse.errorMessage, actualResponse.errorMessage)
     assertEquals(expectedResponse.acknowledgeErrorCode, 
actualResponse.acknowledgeErrorCode)
     assertEquals(expectedResponse.acquiredRecords, 
actualResponse.acquiredRecords)
   }
@@ -2289,8 +2462,9 @@ class ShareFetchAcknowledgeRequestTest(cluster: 
ClusterInstance) extends GroupCo
                                       maxWaitMs: Int = MAX_WAIT_MS,
                                       minBytes: Int = 0,
                                       maxBytes: Int = Int.MaxValue,
+                                      maxRecords: Int = 500,
                                       batchSize: Int = 500): ShareFetchRequest 
= {
-    ShareFetchRequest.Builder.forConsumer(groupId, metadata, maxWaitMs, 
minBytes, maxBytes, batchSize, send.asJava, forget.asJava, 
acknowledgementsMap.asJava)
+    ShareFetchRequest.Builder.forConsumer(groupId, metadata, maxWaitMs, 
minBytes, maxBytes, maxRecords, batchSize, send.asJava, forget.asJava, 
acknowledgementsMap.asJava)
       .build()
   }
   
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfig.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfig.java
index 4e34e15fee9..e216bd4037c 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfig.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfig.java
@@ -27,7 +27,6 @@ import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
 
-import static org.apache.kafka.common.config.ConfigDef.Importance.HIGH;
 import static org.apache.kafka.common.config.ConfigDef.Importance.MEDIUM;
 import static org.apache.kafka.common.config.ConfigDef.Range.between;
 import static org.apache.kafka.common.config.ConfigDef.Type.BOOLEAN;
@@ -76,14 +75,8 @@ public class ShareGroupConfig {
     public static final String SHARE_GROUP_PERSISTER_CLASS_NAME_DOC = "The 
class name of share persister for share group. The class should implement " +
         "the <code>org.apache.kafka.server.share.Persister</code> interface.";
 
-    // Broker temporary configuration to limit the number of records fetched 
by a share fetch request.
-    public static final String SHARE_FETCH_MAX_FETCH_RECORDS_CONFIG = 
"share.fetch.max.fetch.records";
-    public static final int SHARE_FETCH_MAX_FETCH_RECORDS_DEFAULT = 
Integer.MAX_VALUE;
-    public static final String SHARE_FETCH_MAX_FETCH_RECORDS_DOC = "The 
maximum number of records that can be fetched by a share fetch request.";
-
     public static final ConfigDef CONFIG_DEF = new ConfigDef()
             .defineInternal(SHARE_GROUP_ENABLE_CONFIG, BOOLEAN, 
SHARE_GROUP_ENABLE_DEFAULT, null, MEDIUM, SHARE_GROUP_ENABLE_DOC)
-            .defineInternal(SHARE_FETCH_MAX_FETCH_RECORDS_CONFIG, INT, 
SHARE_FETCH_MAX_FETCH_RECORDS_DEFAULT, null, HIGH, 
SHARE_FETCH_MAX_FETCH_RECORDS_DOC)
             .define(SHARE_GROUP_DELIVERY_COUNT_LIMIT_CONFIG, INT, 
SHARE_GROUP_DELIVERY_COUNT_LIMIT_DEFAULT, between(2, 10), MEDIUM, 
SHARE_GROUP_DELIVERY_COUNT_LIMIT_DOC)
             .define(SHARE_GROUP_RECORD_LOCK_DURATION_MS_CONFIG, INT, 
SHARE_GROUP_RECORD_LOCK_DURATION_MS_DEFAULT, between(1000, 3600000), MEDIUM, 
SHARE_GROUP_RECORD_LOCK_DURATION_MS_DOC)
             .define(SHARE_GROUP_MIN_RECORD_LOCK_DURATION_MS_CONFIG, INT, 
SHARE_GROUP_MIN_RECORD_LOCK_DURATION_MS_DEFAULT, between(1000, 30000), MEDIUM, 
SHARE_GROUP_MIN_RECORD_LOCK_DURATION_MS_DOC)
@@ -102,7 +95,6 @@ public class ShareGroupConfig {
     private final int shareGroupMinRecordLockDurationMs;
     private final int shareFetchPurgatoryPurgeIntervalRequests;
     private final String shareGroupPersisterClassName;
-    private final int shareFetchMaxFetchRecords;
 
     public ShareGroupConfig(AbstractConfig config) {
         // Share groups are enabled in two cases: 1) The internal 
configuration to enable it is
@@ -119,7 +111,6 @@ public class ShareGroupConfig {
         shareGroupMinRecordLockDurationMs = 
config.getInt(ShareGroupConfig.SHARE_GROUP_MIN_RECORD_LOCK_DURATION_MS_CONFIG);
         shareFetchPurgatoryPurgeIntervalRequests = 
config.getInt(ShareGroupConfig.SHARE_FETCH_PURGATORY_PURGE_INTERVAL_REQUESTS_CONFIG);
         shareGroupPersisterClassName = 
config.getString(ShareGroupConfig.SHARE_GROUP_PERSISTER_CLASS_NAME_CONFIG);
-        shareFetchMaxFetchRecords = 
config.getInt(ShareGroupConfig.SHARE_FETCH_MAX_FETCH_RECORDS_CONFIG);
         validate();
     }
 
@@ -160,10 +151,6 @@ public class ShareGroupConfig {
         return shareGroupPersisterClassName;
     }
 
-    public int shareFetchMaxFetchRecords() {
-        return shareFetchMaxFetchRecords;
-    }
-
     private void validate() {
         Utils.require(shareGroupRecordLockDurationMs >= 
shareGroupMinRecordLockDurationMs,
                 String.format("%s must be greater than or equal to %s",
diff --git 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/KRaftMetadataRequestBenchmark.java
 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/KRaftMetadataRequestBenchmark.java
index 31d88091576..b4779a5fd3b 100644
--- 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/KRaftMetadataRequestBenchmark.java
+++ 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/KRaftMetadataRequestBenchmark.java
@@ -50,6 +50,7 @@ import org.apache.kafka.common.requests.RequestHeader;
 import org.apache.kafka.common.security.auth.KafkaPrincipal;
 import org.apache.kafka.common.security.auth.SecurityProtocol;
 import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.coordinator.group.GroupConfigManager;
 import org.apache.kafka.coordinator.group.GroupCoordinator;
 import org.apache.kafka.image.MetadataDelta;
 import org.apache.kafka.image.MetadataImage;
@@ -119,6 +120,7 @@ public class KRaftMetadataRequestBenchmark {
     private final FetchManager fetchManager = Mockito.mock(FetchManager.class);
     private final SharePartitionManager sharePartitionManager = 
Mockito.mock(SharePartitionManager.class);
     private final ClientMetricsManager clientMetricsManager = 
Mockito.mock(ClientMetricsManager.class);
+    private final GroupConfigManager groupConfigManager = 
Mockito.mock(GroupConfigManager.class);
     private final BrokerTopicStats brokerTopicStats = new 
BrokerTopicStats(false);
     private final KafkaPrincipal principal = new 
KafkaPrincipal(KafkaPrincipal.USER_TYPE, "test-user");
     @Param({"500", "1000",  "5000"})
@@ -206,6 +208,7 @@ public class KRaftMetadataRequestBenchmark {
                         ApiMessageType.ListenerType.BROKER,
                         false,
                         () -> 
FinalizedFeatures.fromKRaftVersion(MetadataVersion.latestTesting()))).
+                setGroupConfigManager(groupConfigManager).
                 build();
     }
 
diff --git 
a/server/src/main/java/org/apache/kafka/server/share/context/FinalContext.java 
b/server/src/main/java/org/apache/kafka/server/share/context/FinalContext.java
index 1be37ae87b7..9abc5591ddd 100644
--- 
a/server/src/main/java/org/apache/kafka/server/share/context/FinalContext.java
+++ 
b/server/src/main/java/org/apache/kafka/server/share/context/FinalContext.java
@@ -55,7 +55,7 @@ public class FinalContext extends ShareFetchContext {
                                                      
LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData> updates) {
         log.debug("Final context returning {}", 
partitionsToLogString(updates.keySet()));
         return new 
ShareFetchResponse(ShareFetchResponse.toMessage(Errors.NONE, 0,
-                updates.entrySet().iterator(), List.of()));
+                updates.entrySet().iterator(), List.of(), 0));
     }
 
     @Override
diff --git 
a/server/src/main/java/org/apache/kafka/server/share/context/ShareFetchContext.java
 
b/server/src/main/java/org/apache/kafka/server/share/context/ShareFetchContext.java
index 36062616cec..36bf67ad828 100644
--- 
a/server/src/main/java/org/apache/kafka/server/share/context/ShareFetchContext.java
+++ 
b/server/src/main/java/org/apache/kafka/server/share/context/ShareFetchContext.java
@@ -51,7 +51,7 @@ public abstract class ShareFetchContext {
      */
     public ShareFetchResponse throttleResponse(int throttleTimeMs) {
         return new 
ShareFetchResponse(ShareFetchResponse.toMessage(Errors.NONE, throttleTimeMs,
-                Collections.emptyIterator(), List.of()));
+                Collections.emptyIterator(), List.of(), 0));
     }
 
     /**
diff --git 
a/server/src/main/java/org/apache/kafka/server/share/context/ShareSessionContext.java
 
b/server/src/main/java/org/apache/kafka/server/share/context/ShareSessionContext.java
index 99eb92a85b9..c3d177c8808 100644
--- 
a/server/src/main/java/org/apache/kafka/server/share/context/ShareSessionContext.java
+++ 
b/server/src/main/java/org/apache/kafka/server/share/context/ShareSessionContext.java
@@ -103,7 +103,7 @@ public class ShareSessionContext extends ShareFetchContext {
     public ShareFetchResponse throttleResponse(int throttleTimeMs) {
         if (!isSubsequent) {
             return new 
ShareFetchResponse(ShareFetchResponse.toMessage(Errors.NONE, throttleTimeMs,
-                    Collections.emptyIterator(), List.of()));
+                    Collections.emptyIterator(), List.of(), 0));
         }
         int expectedEpoch = 
ShareRequestMetadata.nextEpoch(reqMetadata.epoch());
         int sessionEpoch;
@@ -114,10 +114,10 @@ public class ShareSessionContext extends 
ShareFetchContext {
             log.debug("Subsequent share session {} expected epoch {}, but got 
{}. " +
                     "Possible duplicate request.", session.key(), 
expectedEpoch, sessionEpoch);
             return new 
ShareFetchResponse(ShareFetchResponse.toMessage(Errors.INVALID_SHARE_SESSION_EPOCH,
-                    throttleTimeMs, Collections.emptyIterator(), List.of()));
+                    throttleTimeMs, Collections.emptyIterator(), List.of(), 
0));
         }
         return new 
ShareFetchResponse(ShareFetchResponse.toMessage(Errors.NONE, throttleTimeMs,
-                Collections.emptyIterator(), List.of()));
+                Collections.emptyIterator(), List.of(), 0));
     }
 
     /**
@@ -196,7 +196,7 @@ public class ShareSessionContext extends ShareFetchContext {
                                                      
LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData> updates) {
         if (!isSubsequent) {
             return new ShareFetchResponse(ShareFetchResponse.toMessage(
-                    Errors.NONE, 0, updates.entrySet().iterator(), List.of()));
+                    Errors.NONE, 0, updates.entrySet().iterator(), List.of(), 
0));
         } else {
             int expectedEpoch = 
ShareRequestMetadata.nextEpoch(reqMetadata.epoch());
             int sessionEpoch;
@@ -207,7 +207,7 @@ public class ShareSessionContext extends ShareFetchContext {
                 log.debug("Subsequent share session {} expected epoch {}, but 
got {}. Possible duplicate request.",
                         session.key(), expectedEpoch, sessionEpoch);
                 return new 
ShareFetchResponse(ShareFetchResponse.toMessage(Errors.INVALID_SHARE_SESSION_EPOCH,
-                        0, Collections.emptyIterator(), List.of()));
+                        0, Collections.emptyIterator(), List.of(), 0));
             }
             // Iterate over the update list using PartitionIterator. This will 
prune updates which don't need to be sent
             Iterator<Map.Entry<TopicIdPartition, 
ShareFetchResponseData.PartitionData>> partitionIterator = new 
PartitionIterator(
@@ -218,7 +218,7 @@ public class ShareSessionContext extends ShareFetchContext {
             log.debug("Subsequent share session context with session key {} 
returning {}", session.key(),
                     partitionsToLogString(updates.keySet()));
             return new ShareFetchResponse(ShareFetchResponse.toMessage(
-                    Errors.NONE, 0, updates.entrySet().iterator(), List.of()));
+                    Errors.NONE, 0, updates.entrySet().iterator(), List.of(), 
0));
         }
     }
 

Reply via email to