This is an automated email from the ASF dual-hosted git repository.

schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new f3da8f500eb KAFKA-18936: Fix share fetch when records are larger than 
max bytes (#19145)
f3da8f500eb is described below

commit f3da8f500eb3ecd64440bf72ad46e8cf89eb5f68
Author: Apoorv Mittal <[email protected]>
AuthorDate: Wed Mar 12 09:03:35 2025 +0000

    KAFKA-18936: Fix share fetch when records are larger than max bytes (#19145)
    
    The PR fixes the behaviour when records are fetched which are larger
    than `fetch.max.bytes` config.
    
    The usage of `hardMaxBytesLimit` is in ReplicaManager where it decides
    whether to fetch a single record or not. The file records get sliced
    based on the bytes requested. However, if `hardMaxBytesLimit` is false
    then at least one record is fetched and bytes are adjusted accordingly in
    `localLog`.
    
    Reviewers: Jun Rao <[email protected]>, Andrew Schofield 
<[email protected]>, Abhinav Dixit <[email protected]>
---
 .../java/kafka/log/remote/RemoteLogManager.java    |  5 +----
 core/src/main/scala/kafka/server/KafkaApis.scala   |  3 ---
 .../scala/kafka/server/LocalLeaderEndPoint.scala   |  1 -
 .../main/scala/kafka/server/ReplicaManager.scala   | 10 +++++-----
 .../kafka/log/remote/RemoteLogManagerTest.java     |  8 ++++----
 .../java/kafka/log/remote/RemoteLogReaderTest.java |  4 ++--
 .../kafka/server/share/DelayedShareFetchTest.java  | 15 +++++++-------
 .../kafka/server/share/ShareFetchUtilsTest.java    |  3 +--
 .../server/share/SharePartitionManagerTest.java    |  2 +-
 .../java/kafka/test/api/ShareConsumerTest.java     | 19 ++++++++++++++++++
 .../kafka/server/DelayedFetchTest.scala            |  3 +--
 .../kafka/server/DelayedRemoteFetchTest.scala      | 13 ++++++------
 .../unit/kafka/cluster/PartitionLockTest.scala     |  2 --
 .../scala/unit/kafka/cluster/PartitionTest.scala   |  2 --
 .../server/ReplicaAlterLogDirsThreadTest.scala     |  3 +--
 .../server/ReplicaManagerConcurrencyTest.scala     |  3 +--
 .../kafka/server/ReplicaManagerQuotasTest.scala    |  3 ---
 .../unit/kafka/server/ReplicaManagerTest.scala     | 13 ++++++------
 .../kafka/server/storage/log/FetchParams.java      | 23 ++++++----------------
 .../internals/log/RemoteStorageFetchInfo.java      |  6 +-----
 20 files changed, 62 insertions(+), 79 deletions(-)

diff --git a/core/src/main/java/kafka/log/remote/RemoteLogManager.java 
b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
index 5bdc9f34c4a..250e064e059 100644
--- a/core/src/main/java/kafka/log/remote/RemoteLogManager.java
+++ b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
@@ -1716,10 +1716,7 @@ public class RemoteLogManager implements Closeable, 
AsyncOffsetReader {
             // An empty record is sent instead of an incomplete batch when
             //  - there is no minimum-one-message constraint and
             //  - the first batch size is more than maximum bytes that can be 
sent and
-            //  - for FetchRequest version 3 or above.
-            if (!remoteStorageFetchInfo.minOneMessage &&
-                    !remoteStorageFetchInfo.hardMaxBytesLimit &&
-                    firstBatchSize > maxBytes) {
+            if (!remoteStorageFetchInfo.minOneMessage && firstBatchSize > 
maxBytes) {
                 return new FetchDataInfo(new LogOffsetMetadata(offset), 
MemoryRecords.EMPTY);
             }
 
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala 
b/core/src/main/scala/kafka/server/KafkaApis.scala
index e77a67b35b3..02c1cb5b73b 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -723,7 +723,6 @@ class KafkaApis(val requestChannel: RequestChannel,
       }
 
       val params = new FetchParams(
-        versionId,
         fetchRequest.replicaId,
         fetchRequest.replicaEpoch,
         fetchRequest.maxWait,
@@ -3153,7 +3152,6 @@ class KafkaApis(val requestChannel: RequestChannel,
     val shareFetchRequest = request.body[ShareFetchRequest]
 
     val clientId = request.header.clientId
-    val versionId = request.header.apiVersion
     val groupId = shareFetchRequest.data.groupId
 
     if (interestedWithMaxBytes.isEmpty) {
@@ -3176,7 +3174,6 @@ class KafkaApis(val requestChannel: RequestChannel,
           request.context.listenerName.value))
 
       val params = new FetchParams(
-        versionId,
         FetchRequest.CONSUMER_REPLICA_ID,
         -1,
         shareFetchRequest.maxWait,
diff --git a/core/src/main/scala/kafka/server/LocalLeaderEndPoint.scala 
b/core/src/main/scala/kafka/server/LocalLeaderEndPoint.scala
index 1e2a6cd033e..22d3ba2c0c1 100644
--- a/core/src/main/scala/kafka/server/LocalLeaderEndPoint.scala
+++ b/core/src/main/scala/kafka/server/LocalLeaderEndPoint.scala
@@ -92,7 +92,6 @@ class LocalLeaderEndPoint(sourceBroker: BrokerEndPoint,
     val fetchData = request.fetchData(topicNames.asJava)
 
     val fetchParams = new FetchParams(
-      request.version,
       FetchRequest.FUTURE_LOCAL_REPLICA_ID,
       -1,
       0L, // timeout is 0 so that the callback will be executed immediately
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala 
b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 885cfc82be5..9fdd42b3174 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -1685,9 +1685,9 @@ class ReplicaManager(val config: KafkaConfig,
       if (params.isFromFollower && shouldLeaderThrottle(quota, partition, 
params.replicaId)) {
         // If the partition is being throttled, simply return an empty set.
         new FetchDataInfo(givenFetchedDataInfo.fetchOffsetMetadata, 
MemoryRecords.EMPTY)
-      } else if (!params.hardMaxBytesLimit && 
givenFetchedDataInfo.firstEntryIncomplete) {
-        // For FetchRequest version 3, we replace incomplete message sets with 
an empty one as consumers can make
-        // progress in such cases and don't need to report a 
`RecordTooLargeException`
+      } else if (givenFetchedDataInfo.firstEntryIncomplete) {
+        // Replace incomplete message sets with an empty one as consumers can 
make progress in such
+        // cases and don't need to report a `RecordTooLargeException`
         new FetchDataInfo(givenFetchedDataInfo.fetchOffsetMetadata, 
MemoryRecords.EMPTY)
       } else {
         givenFetchedDataInfo
@@ -1799,7 +1799,7 @@ class ReplicaManager(val config: KafkaConfig,
 
     var limitBytes = params.maxBytes
     val result = new mutable.ArrayBuffer[(TopicIdPartition, LogReadResult)]
-    var minOneMessage = !params.hardMaxBytesLimit
+    var minOneMessage = true
     readPartitionInfo.foreach { case (tp, fetchInfo) =>
       val readResult = read(tp, fetchInfo, limitBytes, minOneMessage)
       val recordBatchSize = readResult.info.records.sizeInBytes
@@ -1855,7 +1855,7 @@ class ReplicaManager(val config: KafkaConfig,
           // For the first topic-partition that needs remote data, we will use 
this information to read the data in another thread.
           new FetchDataInfo(new LogOffsetMetadata(offset), 
MemoryRecords.EMPTY, false, Optional.empty(),
             Optional.of(new RemoteStorageFetchInfo(adjustedMaxBytes, 
minOneMessage, tp.topicPartition(),
-              fetchInfo, params.isolation, params.hardMaxBytesLimit())))
+              fetchInfo, params.isolation)))
         }
 
         LogReadResult(fetchDataInfo,
diff --git a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java 
b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
index 272c78fce84..acb22dd5765 100644
--- a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
+++ b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
@@ -3129,7 +3129,7 @@ public class RemoteLogManagerTest {
         );
 
         RemoteStorageFetchInfo fetchInfo = new RemoteStorageFetchInfo(
-                0, false, tp, partitionData, FetchIsolation.TXN_COMMITTED, 
false
+                0, false, tp, partitionData, FetchIsolation.TXN_COMMITTED
         );
 
         try (RemoteLogManager remoteLogManager = new RemoteLogManager(
@@ -3206,7 +3206,7 @@ public class RemoteLogManagerTest {
         );
 
         RemoteStorageFetchInfo fetchInfo = new RemoteStorageFetchInfo(
-                0, minOneMessage, tp, partitionData, 
FetchIsolation.HIGH_WATERMARK, false
+                0, minOneMessage, tp, partitionData, 
FetchIsolation.HIGH_WATERMARK
         );
 
         try (RemoteLogManager remoteLogManager = new RemoteLogManager(
@@ -3290,7 +3290,7 @@ public class RemoteLogManagerTest {
         when(firstBatch.sizeInBytes()).thenReturn(recordBatchSizeInBytes);
         doNothing().when(firstBatch).writeTo(capture.capture());
         RemoteStorageFetchInfo fetchInfo = new RemoteStorageFetchInfo(
-                0, true, tp, partitionData, FetchIsolation.HIGH_WATERMARK, 
false
+                0, true, tp, partitionData, FetchIsolation.HIGH_WATERMARK
         );
 
 
@@ -3674,7 +3674,7 @@ public class RemoteLogManagerTest {
                 Uuid.randomUuid(), fetchOffset, 0, 100, Optional.empty());
         RemoteStorageFetchInfo remoteStorageFetchInfo = new 
RemoteStorageFetchInfo(
                 1048576, true, leaderTopicIdPartition.topicPartition(),
-                partitionData, FetchIsolation.HIGH_WATERMARK, false);
+                partitionData, FetchIsolation.HIGH_WATERMARK);
         FetchDataInfo fetchDataInfo = 
remoteLogManager.read(remoteStorageFetchInfo);
         // firstBatch baseOffset may not be equal to the fetchOffset
         assertEquals(9, fetchDataInfo.fetchOffsetMetadata.messageOffset);
diff --git a/core/src/test/java/kafka/log/remote/RemoteLogReaderTest.java 
b/core/src/test/java/kafka/log/remote/RemoteLogReaderTest.java
index 400cf3c2dff..53905e6d114 100644
--- a/core/src/test/java/kafka/log/remote/RemoteLogReaderTest.java
+++ b/core/src/test/java/kafka/log/remote/RemoteLogReaderTest.java
@@ -70,7 +70,7 @@ public class RemoteLogReaderTest {
         
when(mockRLM.read(any(RemoteStorageFetchInfo.class))).thenReturn(fetchDataInfo);
 
         Consumer<RemoteLogReadResult> callback = mock(Consumer.class);
-        RemoteStorageFetchInfo remoteStorageFetchInfo = new 
RemoteStorageFetchInfo(0, false, new TopicPartition(TOPIC, 0), null, null, 
false);
+        RemoteStorageFetchInfo remoteStorageFetchInfo = new 
RemoteStorageFetchInfo(0, false, new TopicPartition(TOPIC, 0), null, null);
         RemoteLogReader remoteLogReader =
                 new RemoteLogReader(remoteStorageFetchInfo, mockRLM, callback, 
brokerTopicStats, mockQuotaManager, timer);
         remoteLogReader.call();
@@ -103,7 +103,7 @@ public class RemoteLogReaderTest {
         when(mockRLM.read(any(RemoteStorageFetchInfo.class))).thenThrow(new 
RuntimeException("error"));
 
         Consumer<RemoteLogReadResult> callback = mock(Consumer.class);
-        RemoteStorageFetchInfo remoteStorageFetchInfo = new 
RemoteStorageFetchInfo(0, false, new TopicPartition(TOPIC, 0), null, null, 
false);
+        RemoteStorageFetchInfo remoteStorageFetchInfo = new 
RemoteStorageFetchInfo(0, false, new TopicPartition(TOPIC, 0), null, null);
         RemoteLogReader remoteLogReader =
                 new RemoteLogReader(remoteStorageFetchInfo, mockRLM, callback, 
brokerTopicStats, mockQuotaManager, timer);
         remoteLogReader.call();
diff --git a/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java 
b/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java
index 27aae04f176..aa0e855d931 100644
--- a/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java
+++ b/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java
@@ -26,7 +26,6 @@ import org.apache.kafka.common.TopicIdPartition;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.message.ShareFetchResponseData;
-import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.record.Records;
 import org.apache.kafka.common.requests.FetchRequest;
 import org.apache.kafka.common.utils.Time;
@@ -96,7 +95,7 @@ public class DelayedShareFetchTest {
     private static final int MAX_WAIT_MS = 5000;
     private static final int BATCH_SIZE = 500;
     private static final int MAX_FETCH_RECORDS = 100;
-    private static final FetchParams FETCH_PARAMS = new 
FetchParams(ApiKeys.SHARE_FETCH.latestVersion(),
+    private static final FetchParams FETCH_PARAMS = new FetchParams(
         FetchRequest.ORDINARY_CONSUMER_ID, -1, MAX_WAIT_MS, 1, 1024 * 1024, 
FetchIsolation.HIGH_WATERMARK,
         Optional.empty(), true);
     private static final BrokerTopicStats BROKER_TOPIC_STATS = new 
BrokerTopicStats();
@@ -180,7 +179,7 @@ public class DelayedShareFetchTest {
         sharePartitions.put(tp1, sp1);
 
         ShareFetch shareFetch = new ShareFetch(
-            new FetchParams(ApiKeys.SHARE_FETCH.latestVersion(), 
FetchRequest.ORDINARY_CONSUMER_ID, -1, MAX_WAIT_MS,
+            new FetchParams(FetchRequest.ORDINARY_CONSUMER_ID, -1, MAX_WAIT_MS,
                 2, 1024 * 1024, FetchIsolation.HIGH_WATERMARK, 
Optional.empty()), groupId, Uuid.randomUuid().toString(),
             new CompletableFuture<>(), partitionMaxBytes, BATCH_SIZE, 
MAX_FETCH_RECORDS,
             BROKER_TOPIC_STATS);
@@ -253,7 +252,7 @@ public class DelayedShareFetchTest {
         sharePartitions.put(tp1, sp1);
 
         ShareFetch shareFetch = new ShareFetch(
-            new FetchParams(ApiKeys.SHARE_FETCH.latestVersion(), 
FetchRequest.ORDINARY_CONSUMER_ID, -1, MAX_WAIT_MS,
+            new FetchParams(FetchRequest.ORDINARY_CONSUMER_ID, -1, MAX_WAIT_MS,
                 2, 1024 * 1024, FetchIsolation.HIGH_WATERMARK, 
Optional.empty()), groupId, Uuid.randomUuid().toString(),
             new CompletableFuture<>(), partitionMaxBytes, BATCH_SIZE, 
MAX_FETCH_RECORDS,
             BROKER_TOPIC_STATS);
@@ -627,7 +626,7 @@ public class DelayedShareFetchTest {
 
         CompletableFuture<Map<TopicIdPartition, 
ShareFetchResponseData.PartitionData>> future = new CompletableFuture<>();
         ShareFetch shareFetch = new ShareFetch(
-            new FetchParams(ApiKeys.SHARE_FETCH.latestVersion(), 
FetchRequest.ORDINARY_CONSUMER_ID, -1, MAX_WAIT_MS,
+            new FetchParams(FetchRequest.ORDINARY_CONSUMER_ID, -1, MAX_WAIT_MS,
                 1, 1024 * 1024, FetchIsolation.HIGH_WATERMARK, 
Optional.empty()), groupId, Uuid.randomUuid().toString(),
             future, partitionMaxBytes, BATCH_SIZE, MAX_FETCH_RECORDS, 
BROKER_TOPIC_STATS);
 
@@ -684,7 +683,7 @@ public class DelayedShareFetchTest {
         sharePartitions.put(tp0, sp0);
 
         ShareFetch shareFetch = new ShareFetch(
-            new FetchParams(ApiKeys.SHARE_FETCH.latestVersion(), 
FetchRequest.ORDINARY_CONSUMER_ID, -1, MAX_WAIT_MS,
+            new FetchParams(FetchRequest.ORDINARY_CONSUMER_ID, -1, MAX_WAIT_MS,
                 1, 1024 * 1024, FetchIsolation.HIGH_WATERMARK, 
Optional.empty()), groupId, Uuid.randomUuid().toString(),
             new CompletableFuture<>(), partitionMaxBytes, BATCH_SIZE, 
MAX_FETCH_RECORDS,
             BROKER_TOPIC_STATS);
@@ -866,7 +865,7 @@ public class DelayedShareFetchTest {
         CompletableFuture<Map<TopicIdPartition, 
ShareFetchResponseData.PartitionData>> future = new CompletableFuture<>();
 
         ShareFetch shareFetch = new ShareFetch(
-            new FetchParams(ApiKeys.SHARE_FETCH.latestVersion(), 
FetchRequest.ORDINARY_CONSUMER_ID, -1, MAX_WAIT_MS,
+            new FetchParams(FetchRequest.ORDINARY_CONSUMER_ID, -1, MAX_WAIT_MS,
                 2, 1024 * 1024, FetchIsolation.HIGH_WATERMARK, 
Optional.empty()), groupId, Uuid.randomUuid().toString(),
             future, partitionMaxBytes, BATCH_SIZE, MAX_FETCH_RECORDS, 
BROKER_TOPIC_STATS);
 
@@ -1103,7 +1102,7 @@ public class DelayedShareFetchTest {
         sharePartitions.put(tp2, sp2);
 
         ShareFetch shareFetch = new ShareFetch(
-            new FetchParams(ApiKeys.SHARE_FETCH.latestVersion(), 
FetchRequest.ORDINARY_CONSUMER_ID, -1, MAX_WAIT_MS,
+            new FetchParams(FetchRequest.ORDINARY_CONSUMER_ID, -1, MAX_WAIT_MS,
                 1, 1024 * 1024, FetchIsolation.HIGH_WATERMARK, 
Optional.empty()), groupId, Uuid.randomUuid().toString(),
             new CompletableFuture<>(), partitionMaxBytes, BATCH_SIZE, 
MAX_FETCH_RECORDS,
             BROKER_TOPIC_STATS);
diff --git a/core/src/test/java/kafka/server/share/ShareFetchUtilsTest.java 
b/core/src/test/java/kafka/server/share/ShareFetchUtilsTest.java
index 6baa3b05b53..f4bd75971ee 100644
--- a/core/src/test/java/kafka/server/share/ShareFetchUtilsTest.java
+++ b/core/src/test/java/kafka/server/share/ShareFetchUtilsTest.java
@@ -25,7 +25,6 @@ import org.apache.kafka.common.compress.Compression;
 import org.apache.kafka.common.errors.FencedLeaderEpochException;
 import org.apache.kafka.common.message.ShareFetchResponseData;
 import org.apache.kafka.common.message.ShareFetchResponseData.AcquiredRecords;
-import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.record.FileRecords;
 import org.apache.kafka.common.record.MemoryRecords;
@@ -80,7 +79,7 @@ import static org.mockito.Mockito.when;
 
 public class ShareFetchUtilsTest {
 
-    private static final FetchParams FETCH_PARAMS = new 
FetchParams(ApiKeys.SHARE_FETCH.latestVersion(),
+    private static final FetchParams FETCH_PARAMS = new FetchParams(
         FetchRequest.ORDINARY_CONSUMER_ID, -1, 0, 1, 1024 * 1024, 
FetchIsolation.HIGH_WATERMARK,
         Optional.empty(), true);
     private static final int BATCH_SIZE = 500;
diff --git 
a/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java 
b/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
index 4f550c6751b..832303a58ef 100644
--- a/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
+++ b/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
@@ -152,7 +152,7 @@ public class SharePartitionManagerTest {
     private static final int DELAYED_SHARE_FETCH_MAX_WAIT_MS = 2000;
     private static final int DELAYED_SHARE_FETCH_TIMEOUT_MS = 3000;
     private static final int BATCH_SIZE = 500;
-    private static final FetchParams FETCH_PARAMS = new 
FetchParams(ApiKeys.SHARE_FETCH.latestVersion(),
+    private static final FetchParams FETCH_PARAMS = new FetchParams(
         FetchRequest.ORDINARY_CONSUMER_ID, -1, DELAYED_SHARE_FETCH_MAX_WAIT_MS,
         1, 1024 * 1024, FetchIsolation.HIGH_WATERMARK, Optional.empty(), true);
     private static final String TIMER_NAME_PREFIX = "share-partition-manager";
diff --git a/core/src/test/java/kafka/test/api/ShareConsumerTest.java 
b/core/src/test/java/kafka/test/api/ShareConsumerTest.java
index f485e4f816e..44509f7bb26 100644
--- a/core/src/test/java/kafka/test/api/ShareConsumerTest.java
+++ b/core/src/test/java/kafka/test/api/ShareConsumerTest.java
@@ -289,6 +289,25 @@ public class ShareConsumerTest {
         }
     }
 
+    @ClusterTest
+    public void testPollRecordsGreaterThanMaxBytes() {
+        setup();
+        alterShareAutoOffsetReset("group1", "earliest");
+        try (Producer<byte[], byte[]> producer = createProducer();
+             ShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(
+                 "group1",
+                 Map.of(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, 1))
+        ) {
+            ProducerRecord<byte[], byte[]> record = new 
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), 
"value".getBytes());
+            producer.send(record);
+            producer.flush();
+            shareConsumer.subscribe(List.of(tp.topic()));
+            ConsumerRecords<byte[], byte[]> records = 
shareConsumer.poll(Duration.ofMillis(5000));
+            assertEquals(1, records.count());
+            verifyShareGroupStateTopicRecordsProduced();
+        }
+    }
+
     @ClusterTest
     public void testAcknowledgementSentOnSubscriptionChange() throws 
ExecutionException, InterruptedException {
         setup();
diff --git 
a/core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala 
b/core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala
index 34000b44173..9c5fcf90779 100644
--- a/core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala
+++ b/core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala
@@ -22,7 +22,7 @@ import kafka.cluster.Partition
 import org.apache.kafka.common.{TopicIdPartition, Uuid}
 import org.apache.kafka.common.errors.{FencedLeaderEpochException, 
NotLeaderOrFollowerException}
 import 
org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset
-import org.apache.kafka.common.protocol.{ApiKeys, Errors}
+import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.record.MemoryRecords
 import org.apache.kafka.common.requests.FetchRequest
 import org.apache.kafka.server.storage.log.{FetchIsolation, FetchParams, 
FetchPartitionData}
@@ -230,7 +230,6 @@ class DelayedFetchTest {
     minBytes: Int = 1,
   ): FetchParams = {
     new FetchParams(
-      ApiKeys.FETCH.latestVersion,
       replicaId,
       1,
       maxWaitMs,
diff --git 
a/core/src/test/scala/integration/kafka/server/DelayedRemoteFetchTest.scala 
b/core/src/test/scala/integration/kafka/server/DelayedRemoteFetchTest.scala
index 264f5310c2d..424c8cc04ec 100644
--- a/core/src/test/scala/integration/kafka/server/DelayedRemoteFetchTest.scala
+++ b/core/src/test/scala/integration/kafka/server/DelayedRemoteFetchTest.scala
@@ -18,7 +18,7 @@ package kafka.server
 
 import kafka.cluster.Partition
 import org.apache.kafka.common.errors.NotLeaderOrFollowerException
-import org.apache.kafka.common.protocol.{ApiKeys, Errors}
+import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.record.MemoryRecords
 import org.apache.kafka.common.requests.FetchRequest
 import org.apache.kafka.common.{TopicIdPartition, Uuid}
@@ -61,7 +61,7 @@ class DelayedRemoteFetchTest {
 
     val future: CompletableFuture[RemoteLogReadResult] = new 
CompletableFuture[RemoteLogReadResult]()
     future.complete(null)
-    val fetchInfo: RemoteStorageFetchInfo = new RemoteStorageFetchInfo(0, 
false, topicIdPartition.topicPartition(), null, null, false)
+    val fetchInfo: RemoteStorageFetchInfo = new RemoteStorageFetchInfo(0, 
false, topicIdPartition.topicPartition(), null, null)
     val highWatermark = 100
     val leaderLogStartOffset = 10
     val logReadInfo = buildReadResult(Errors.NONE, highWatermark, 
leaderLogStartOffset)
@@ -97,7 +97,7 @@ class DelayedRemoteFetchTest {
 
     val future: CompletableFuture[RemoteLogReadResult] = new 
CompletableFuture[RemoteLogReadResult]()
     future.complete(null)
-    val fetchInfo: RemoteStorageFetchInfo = new RemoteStorageFetchInfo(0, 
false, topicIdPartition.topicPartition(), null, null, false)
+    val fetchInfo: RemoteStorageFetchInfo = new RemoteStorageFetchInfo(0, 
false, topicIdPartition.topicPartition(), null, null)
     val highWatermark = 100
     val leaderLogStartOffset = 10
     val logReadInfo = buildReadResult(Errors.NONE, highWatermark, 
leaderLogStartOffset)
@@ -122,7 +122,7 @@ class DelayedRemoteFetchTest {
       .thenThrow(new NotLeaderOrFollowerException(s"Replica for 
$topicIdPartition not available"))
 
     val future: CompletableFuture[RemoteLogReadResult] = new 
CompletableFuture[RemoteLogReadResult]()
-    val fetchInfo: RemoteStorageFetchInfo = new RemoteStorageFetchInfo(0, 
false, topicIdPartition.topicPartition(), null, null, false)
+    val fetchInfo: RemoteStorageFetchInfo = new RemoteStorageFetchInfo(0, 
false, topicIdPartition.topicPartition(), null, null)
 
     val logReadInfo = buildReadResult(Errors.NONE)
 
@@ -152,7 +152,7 @@ class DelayedRemoteFetchTest {
 
     val future: CompletableFuture[RemoteLogReadResult] = new 
CompletableFuture[RemoteLogReadResult]()
     future.complete(null)
-    val fetchInfo: RemoteStorageFetchInfo = new RemoteStorageFetchInfo(0, 
false, topicIdPartition.topicPartition(), null, null, false)
+    val fetchInfo: RemoteStorageFetchInfo = new RemoteStorageFetchInfo(0, 
false, topicIdPartition.topicPartition(), null, null)
 
     // build a read result with error
     val logReadInfo = buildReadResult(Errors.FENCED_LEADER_EPOCH)
@@ -183,7 +183,7 @@ class DelayedRemoteFetchTest {
 
     val remoteFetchTask = mock(classOf[Future[Void]])
     val future: CompletableFuture[RemoteLogReadResult] = new 
CompletableFuture[RemoteLogReadResult]()
-    val fetchInfo: RemoteStorageFetchInfo = new RemoteStorageFetchInfo(0, 
false, topicIdPartition.topicPartition(), null, null, false)
+    val fetchInfo: RemoteStorageFetchInfo = new RemoteStorageFetchInfo(0, 
false, topicIdPartition.topicPartition(), null, null)
     val logReadInfo = buildReadResult(Errors.NONE, highWatermark, 
leaderLogStartOffset)
 
     val delayedRemoteFetch = new DelayedRemoteFetch(remoteFetchTask, future, 
fetchInfo, remoteFetchMaxWaitMs,
@@ -220,7 +220,6 @@ class DelayedRemoteFetchTest {
   private def buildFetchParams(replicaId: Int,
                                maxWaitMs: Int): FetchParams = {
     new FetchParams(
-      ApiKeys.FETCH.latestVersion,
       replicaId,
       1,
       maxWaitMs,
diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala 
b/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala
index 356496fe9d4..a74cd745627 100644
--- a/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala
+++ b/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala
@@ -25,7 +25,6 @@ import kafka.log._
 import kafka.server._
 import kafka.utils._
 import org.apache.kafka.common.config.TopicConfig
-import org.apache.kafka.common.protocol.ApiKeys
 import org.apache.kafka.common.record.{MemoryRecords, SimpleRecord}
 import org.apache.kafka.common.requests.{FetchRequest, LeaderAndIsrRequest}
 import org.apache.kafka.common.utils.Utils
@@ -395,7 +394,6 @@ class PartitionLockTest extends Logging {
 
     while (fetchOffset < numRecords) {
       val fetchParams = new FetchParams(
-        ApiKeys.FETCH.latestVersion,
         followerId,
         1,
         0L,
diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala 
b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
index 1bb32f1d6c2..837f3abd93b 100644
--- a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
+++ b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
@@ -129,7 +129,6 @@ object PartitionTest {
     maxBytes: Int = Int.MaxValue
   ): FetchParams = {
     new FetchParams(
-      ApiKeys.FETCH.latestVersion,
       replicaId,
       replicaEpoch,
       maxWaitMs,
@@ -148,7 +147,6 @@ object PartitionTest {
     isolation: FetchIsolation = FetchIsolation.HIGH_WATERMARK
   ): FetchParams = {
     new FetchParams(
-      ApiKeys.FETCH.latestVersion,
       FetchRequest.CONSUMER_REPLICA_ID,
       -1,
       maxWaitMs,
diff --git 
a/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala 
b/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala
index f60d0f0e3fd..be4152a0a6d 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala
@@ -25,7 +25,7 @@ import kafka.utils.TestUtils
 import org.apache.kafka.common.errors.KafkaStorageException
 import 
org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderPartition
 import 
org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset
-import org.apache.kafka.common.protocol.{ApiKeys, Errors}
+import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.record.MemoryRecords
 import org.apache.kafka.common.requests.FetchRequest
 import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid}
@@ -503,7 +503,6 @@ class ReplicaAlterLogDirsThreadTest {
       ArgumentCaptor.forClass(classOf[Seq[(TopicIdPartition, 
FetchPartitionData)] => Unit])
 
     val expectedFetchParams = new FetchParams(
-      ApiKeys.FETCH.latestVersion,
       FetchRequest.FUTURE_LOCAL_REPLICA_ID,
       -1,
       0L,
diff --git 
a/core/src/test/scala/unit/kafka/server/ReplicaManagerConcurrencyTest.scala 
b/core/src/test/scala/unit/kafka/server/ReplicaManagerConcurrencyTest.scala
index 98e872d331d..a7395dccde3 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerConcurrencyTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerConcurrencyTest.scala
@@ -26,7 +26,7 @@ import kafka.utils.TestUtils.waitUntilTrue
 import kafka.utils.{CoreUtils, Logging, TestUtils}
 import org.apache.kafka.common.metadata.{FeatureLevelRecord, 
PartitionChangeRecord, PartitionRecord, RegisterBrokerRecord, TopicRecord}
 import org.apache.kafka.common.metrics.Metrics
-import org.apache.kafka.common.protocol.{ApiKeys, Errors}
+import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.record.SimpleRecord
 import org.apache.kafka.common.replica.ClientMetadata.DefaultClientMetadata
 import org.apache.kafka.common.requests.{FetchRequest, ProduceResponse}
@@ -253,7 +253,6 @@ class ReplicaManagerConcurrencyTest extends Logging {
       }
 
       val fetchParams = new FetchParams(
-        ApiKeys.FETCH.latestVersion,
         replicaId,
         defaultBrokerEpoch(replicaId),
         random.nextInt(100),
diff --git 
a/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala 
b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
index f0a4be811bb..d48a35637e4 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
@@ -24,7 +24,6 @@ import kafka.server.QuotaFactory.QuotaManagers
 import kafka.utils._
 import org.apache.kafka.common.compress.Compression
 import org.apache.kafka.common.metrics.Metrics
-import org.apache.kafka.common.protocol.ApiKeys
 import org.apache.kafka.common.record.{MemoryRecords, SimpleRecord}
 import org.apache.kafka.common.requests.FetchRequest
 import org.apache.kafka.common.requests.FetchRequest.PartitionData
@@ -175,7 +174,6 @@ class ReplicaManagerQuotasTest {
         new LogOffsetMetadata(50L, 0L, 250),
         new PartitionData(Uuid.ZERO_UUID, 50, 0, 1, Optional.empty()))
       val fetchParams = new FetchParams(
-        ApiKeys.FETCH.latestVersion,
         1,
         1,
         600,
@@ -227,7 +225,6 @@ class ReplicaManagerQuotasTest {
         new LogOffsetMetadata(50L, 0L, 250),
         new PartitionData(Uuid.ZERO_UUID, 50, 0, 1, Optional.empty()))
       val fetchParams = new FetchParams(
-        ApiKeys.FETCH.latestVersion,
         FetchRequest.CONSUMER_REPLICA_ID,
         -1,
         600L,
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala 
b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 59d9b4b1a63..32677f7c4c2 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -3169,7 +3169,6 @@ class ReplicaManagerTest {
     clientMetadata: Option[ClientMetadata] = None
   ): Unit = {
     val params = new FetchParams(
-      requestVersion,
       replicaId,
       1,
       maxWaitMs,
@@ -3670,7 +3669,7 @@ class ReplicaManagerTest {
         Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
       replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest, (_, _) => 
())
 
-      val params = new FetchParams(ApiKeys.FETCH.latestVersion, replicaId, 1, 
1000, 0, 100, FetchIsolation.LOG_END, None.asJava)
+      val params = new FetchParams(replicaId, 1, 1000, 0, 100, 
FetchIsolation.LOG_END, None.asJava)
       // when reading log, it'll throw OffsetOutOfRangeException, which will 
be handled separately
       val result = replicaManager.readFromLog(params, Seq(tidp0 -> new 
PartitionData(topicId, 1, 0, 100000, Optional.of[Integer](leaderEpoch), 
Optional.of[Integer](leaderEpoch))), UNBOUNDED_QUOTA, false)
 
@@ -3725,7 +3724,7 @@ class ReplicaManagerTest {
         Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
       replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest, (_, _) => 
())
 
-      val params = new FetchParams(ApiKeys.FETCH.latestVersion, replicaId, 1, 
1000, 10, 100, FetchIsolation.LOG_END, None.asJava)
+      val params = new FetchParams(replicaId, 1, 1000, 10, 100, 
FetchIsolation.LOG_END, None.asJava)
       val fetchOffset = 1
 
       def fetchCallback(responseStatus: Seq[(TopicIdPartition, 
FetchPartitionData)]): Unit = {
@@ -3817,7 +3816,7 @@ class ReplicaManagerTest {
         Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
       replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest, (_, _) => 
())
 
-      val params = new FetchParams(ApiKeys.FETCH.latestVersion, replicaId, 1, 
1000, 10, 100, FetchIsolation.LOG_END, None.asJava)
+      val params = new FetchParams(replicaId, 1, 1000, 10, 100, 
FetchIsolation.LOG_END, None.asJava)
       val fetchOffset = 1
       val responseLatch = new CountDownLatch(5)
 
@@ -3943,7 +3942,7 @@ class ReplicaManagerTest {
         endOffsetMetadata,
         endOffsetMetadata))
 
-      val params = new FetchParams(ApiKeys.FETCH.latestVersion, replicaId, 1, 
1000, 10, 100, FetchIsolation.LOG_END, None.asJava)
+      val params = new FetchParams(replicaId, 1, 1000, 10, 100, 
FetchIsolation.LOG_END, None.asJava)
       val fetchOffset = 1
 
       def fetchCallback(responseStatus: Seq[(TopicIdPartition, 
FetchPartitionData)]): Unit = {
@@ -6092,7 +6091,7 @@ class ReplicaManagerTest {
 
       val future = new CompletableFuture[util.Map[TopicIdPartition, 
ShareFetchResponseData.PartitionData]]
       val shareFetch = new ShareFetch(
-        new FetchParams(ApiKeys.SHARE_FETCH.latestVersion, 
FetchRequest.ORDINARY_CONSUMER_ID, -1, 500, 1, 1024 * 1024, 
FetchIsolation.HIGH_WATERMARK, Optional.empty, true),
+        new FetchParams(FetchRequest.ORDINARY_CONSUMER_ID, -1, 500, 1, 1024 * 
1024, FetchIsolation.HIGH_WATERMARK, Optional.empty, true),
         groupId,
         Uuid.randomUuid.toString,
         future,
@@ -6159,7 +6158,7 @@ class ReplicaManagerTest {
         Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
       replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest, (_, _) => 
())
 
-      val params = new FetchParams(ApiKeys.FETCH.latestVersion, -1, 1, 1000, 
0, 100, FetchIsolation.HIGH_WATERMARK, None.asJava)
+      val params = new FetchParams(-1, 1, 1000, 0, 100, 
FetchIsolation.HIGH_WATERMARK, None.asJava)
       replicaManager.readFromLog(
         params,
         Seq(new TopicIdPartition(topicId, 0, topic) -> new 
PartitionData(topicId, 1, 0, 100000, Optional.of[Integer](leaderEpoch), 
Optional.of(leaderEpoch))),
diff --git 
a/server-common/src/main/java/org/apache/kafka/server/storage/log/FetchParams.java
 
b/server-common/src/main/java/org/apache/kafka/server/storage/log/FetchParams.java
index 9829ce76aed..5de55023580 100644
--- 
a/server-common/src/main/java/org/apache/kafka/server/storage/log/FetchParams.java
+++ 
b/server-common/src/main/java/org/apache/kafka/server/storage/log/FetchParams.java
@@ -25,7 +25,6 @@ import java.util.Optional;
 import static 
org.apache.kafka.common.requests.FetchRequest.FUTURE_LOCAL_REPLICA_ID;
 
 public class FetchParams {
-    public final short requestVersion;
     public final int replicaId;
     public final long replicaEpoch;
     public final long maxWaitMs;
@@ -35,19 +34,17 @@ public class FetchParams {
     public final Optional<ClientMetadata> clientMetadata;
     public final boolean shareFetchRequest;
 
-    public FetchParams(short requestVersion,
-                       int replicaId,
+    public FetchParams(int replicaId,
                        long replicaEpoch,
                        long maxWaitMs,
                        int minBytes,
                        int maxBytes,
                        FetchIsolation isolation,
                        Optional<ClientMetadata> clientMetadata) {
-        this(requestVersion, replicaId, replicaEpoch, maxWaitMs, minBytes, 
maxBytes, isolation, clientMetadata, false);
+        this(replicaId, replicaEpoch, maxWaitMs, minBytes, maxBytes, 
isolation, clientMetadata, false);
     }
 
-    public FetchParams(short requestVersion,
-                       int replicaId,
+    public FetchParams(int replicaId,
                        long replicaEpoch,
                        long maxWaitMs,
                        int minBytes,
@@ -57,7 +54,6 @@ public class FetchParams {
                        boolean shareFetchRequest) {
         Objects.requireNonNull(isolation);
         Objects.requireNonNull(clientMetadata);
-        this.requestVersion = requestVersion;
         this.replicaId = replicaId;
         this.replicaEpoch = replicaEpoch;
         this.maxWaitMs = maxWaitMs;
@@ -84,17 +80,12 @@ public class FetchParams {
         return isFromFollower() || (isFromConsumer() && 
clientMetadata.isEmpty()) || shareFetchRequest;
     }
 
-    public boolean hardMaxBytesLimit() {
-        return requestVersion <= 2;
-    }
-
     @Override
     public boolean equals(Object o) {
         if (this == o) return true;
         if (o == null || getClass() != o.getClass()) return false;
         FetchParams that = (FetchParams) o;
-        return requestVersion == that.requestVersion
-                && replicaId == that.replicaId
+        return replicaId == that.replicaId
                 && replicaEpoch == that.replicaEpoch
                 && maxWaitMs == that.maxWaitMs
                 && minBytes == that.minBytes
@@ -106,8 +97,7 @@ public class FetchParams {
 
     @Override
     public int hashCode() {
-        int result = requestVersion;
-        result = 31 * result + replicaId;
+        int result = replicaId;
         result = 31 * result + (int) replicaEpoch;
         result = 31 * result + Long.hashCode(32);
         result = 31 * result + minBytes;
@@ -121,8 +111,7 @@ public class FetchParams {
     @Override
     public String toString() {
         return "FetchParams(" +
-                "requestVersion=" + requestVersion +
-                ", replicaId=" + replicaId +
+                "replicaId=" + replicaId +
                 ", replicaEpoch=" + replicaEpoch +
                 ", maxWaitMs=" + maxWaitMs +
                 ", minBytes=" + minBytes +
diff --git 
a/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteStorageFetchInfo.java
 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteStorageFetchInfo.java
index cf908afce83..c110e750d7c 100644
--- 
a/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteStorageFetchInfo.java
+++ 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteStorageFetchInfo.java
@@ -27,17 +27,14 @@ public class RemoteStorageFetchInfo {
     public final TopicPartition topicPartition;
     public final FetchRequest.PartitionData fetchInfo;
     public final FetchIsolation fetchIsolation;
-    public final boolean hardMaxBytesLimit;
 
     public RemoteStorageFetchInfo(int fetchMaxBytes, boolean minOneMessage, 
TopicPartition topicPartition,
-                                  FetchRequest.PartitionData fetchInfo, 
FetchIsolation fetchIsolation,
-                                  boolean hardMaxBytesLimit) {
+                                  FetchRequest.PartitionData fetchInfo, 
FetchIsolation fetchIsolation) {
         this.fetchMaxBytes = fetchMaxBytes;
         this.minOneMessage = minOneMessage;
         this.topicPartition = topicPartition;
         this.fetchInfo = fetchInfo;
         this.fetchIsolation = fetchIsolation;
-        this.hardMaxBytesLimit = hardMaxBytesLimit;
     }
 
     @Override
@@ -48,7 +45,6 @@ public class RemoteStorageFetchInfo {
                 ", topicPartition=" + topicPartition +
                 ", fetchInfo=" + fetchInfo +
                 ", fetchIsolation=" + fetchIsolation +
-                ", hardMaxBytesLimit=" + hardMaxBytesLimit +
                 '}';
     }
 }


Reply via email to