This is an automated email from the ASF dual-hosted git repository.

jsancio pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 742b327025f KAFKA-14145; Faster KRaft HWM replication (#19800)
742b327025f is described below

commit 742b327025f102770054756f99e38b692efd42d4
Author: José Armando García Sancio <[email protected]>
AuthorDate: Tue Jun 17 13:00:43 2025 -0400

    KAFKA-14145; Faster KRaft HWM replication (#19800)
    
    This change compares the remote replica's HWM with the leader's HWM and
    completes the FETCH request if the remote HWM is less than the leader's
    HWM. When the leader's HWM is updated any pending FETCH RPC is
    completed.
    
    Reviewers: Alyssa Huang <[email protected]>, David Arthur
     <[email protected]>, Andrew Schofield <[email protected]>
---
 .../resources/common/message/FetchRequest.json     |   9 +-
 .../resources/common/message/FetchResponse.json    |   4 +-
 .../controller/FeatureControlManagerTest.java      |  26 +-
 .../org/apache/kafka/raft/KafkaRaftClient.java     |  85 ++++--
 .../kafka/raft/KafkaRaftClientClusterAuthTest.java |   2 +-
 .../kafka/raft/KafkaRaftClientFetchTest.java       | 331 ++++++++++++++++++++-
 .../kafka/raft/KafkaRaftClientPreVoteTest.java     |   6 +-
 .../kafka/raft/KafkaRaftClientReconfigTest.java    |  36 +--
 .../kafka/raft/KafkaRaftClientSnapshotTest.java    |  82 +++--
 .../org/apache/kafka/raft/KafkaRaftClientTest.java | 116 ++++----
 .../apache/kafka/raft/RaftClientTestContext.java   |  78 +++--
 .../kafka/server/common/MetadataVersion.java       |  17 +-
 .../org/apache/kafka/tools/FeatureCommandTest.java |  12 +-
 13 files changed, 641 insertions(+), 163 deletions(-)

diff --git a/clients/src/main/resources/common/message/FetchRequest.json 
b/clients/src/main/resources/common/message/FetchRequest.json
index b7ad185f60b..b4dd880faed 100644
--- a/clients/src/main/resources/common/message/FetchRequest.json
+++ b/clients/src/main/resources/common/message/FetchRequest.json
@@ -56,7 +56,9 @@
   // Version 16 is the same as version 15 (KIP-951).
   //
   // Version 17 adds directory id support from KIP-853
-  "validVersions": "4-17",
+  //
+  // Version 18 adds high-watermark from KIP-1166
+  "validVersions": "4-18",
   "flexibleVersions": "12+",
   "fields": [
     { "name": "ClusterId", "type": "string", "versions": "12+", 
"nullableVersions": "12+", "default": "null",
@@ -103,7 +105,10 @@
         { "name": "PartitionMaxBytes", "type": "int32", "versions": "0+",
           "about": "The maximum bytes to fetch from this partition.  See 
KIP-74 for cases where this limit may not be honored." },
         { "name": "ReplicaDirectoryId", "type": "uuid", "versions": "17+", 
"taggedVersions": "17+", "tag": 0, "ignorable": true,
-          "about": "The directory id of the follower fetching." }
+          "about": "The directory id of the follower fetching." },
+        { "name": "HighWatermark", "type": "int64", "versions": "18+", 
"default": "9223372036854775807", "taggedVersions": "18+",
+          "tag": 1, "ignorable": true,
+          "about": "The high-watermark known by the replica. -1 if the 
high-watermark is not known and 9223372036854775807 if the feature is not 
supported." }
       ]}
     ]},
     { "name": "ForgottenTopicsData", "type": "[]ForgottenTopic", "versions": 
"7+", "ignorable": false,
diff --git a/clients/src/main/resources/common/message/FetchResponse.json 
b/clients/src/main/resources/common/message/FetchResponse.json
index dc8d3517566..36dc05ff60c 100644
--- a/clients/src/main/resources/common/message/FetchResponse.json
+++ b/clients/src/main/resources/common/message/FetchResponse.json
@@ -48,7 +48,9 @@
   // Version 16 adds the 'NodeEndpoints' field (KIP-951).
   //
   // Version 17 no changes to the response (KIP-853).
-  "validVersions": "4-17",
+  //
+  // Version 18 no changes to the response (KIP-1166)
+  "validVersions": "4-18",
   "flexibleVersions": "12+",
   "fields": [
     { "name": "ThrottleTimeMs", "type": "int32", "versions": "1+", 
"ignorable": true,
diff --git 
a/metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java
 
b/metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java
index 5ff9cff626d..774aff96510 100644
--- 
a/metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java
@@ -394,13 +394,25 @@ public class FeatureControlManagerTest {
                 MetadataVersion.MINIMUM_VERSION.featureLevel(), 
MetadataVersion.latestTesting().featureLevel())).
             build();
         manager.replay(new 
FeatureLevelRecord().setName(MetadataVersion.FEATURE_NAME).setFeatureLevel(MetadataVersion.MINIMUM_VERSION.featureLevel()));
-        assertEquals(ControllerResult.of(List.of(), new 
ApiError(Errors.INVALID_UPDATE_VERSION,
-            "Invalid update version 6 for feature metadata.version. Local 
controller 0 only supports versions 7-28")),
-                manager.updateFeatures(
-                        Map.of(MetadataVersion.FEATURE_NAME, 
MetadataVersionTestUtils.IBP_3_3_IV2_FEATURE_LEVEL),
-                        Map.of(MetadataVersion.FEATURE_NAME, 
FeatureUpdate.UpgradeType.UNSAFE_DOWNGRADE),
-                        true,
-                        0));
+        assertEquals(
+            ControllerResult.of(
+                List.of(),
+                new ApiError(
+                    Errors.INVALID_UPDATE_VERSION,
+                    String.format(
+                        "Invalid update version 6 for feature 
metadata.version. Local controller 0 only supports versions %s-%s",
+                        MetadataVersion.MINIMUM_VERSION.featureLevel(),
+                        MetadataVersion.latestTesting().featureLevel()
+                    )
+                )
+            ),
+            manager.updateFeatures(
+                Map.of(MetadataVersion.FEATURE_NAME, 
MetadataVersionTestUtils.IBP_3_3_IV2_FEATURE_LEVEL),
+                Map.of(MetadataVersion.FEATURE_NAME, 
FeatureUpdate.UpgradeType.UNSAFE_DOWNGRADE),
+                true,
+                0
+            )
+        );
     }
 
     @Test
diff --git a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java 
b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
index 760d6b8c159..970ee2d6910 100644
--- a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
+++ b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
@@ -25,7 +25,6 @@ import org.apache.kafka.common.compress.Compression;
 import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.errors.ClusterAuthorizationException;
 import org.apache.kafka.common.errors.CorruptRecordException;
-import org.apache.kafka.common.errors.NotLeaderOrFollowerException;
 import org.apache.kafka.common.feature.SupportedVersionRange;
 import org.apache.kafka.common.memory.MemoryPool;
 import org.apache.kafka.common.message.AddRaftVoterRequestData;
@@ -386,6 +385,11 @@ public final class KafkaRaftClient<T> implements 
RaftClient<T> {
             // records still held in memory directly to the listener
             appendPurgatory.maybeComplete(highWatermark.offset(), 
currentTimeMs);
 
+            // After updating the high-watermark, complete all of the deferred
+            // fetch requests. This is always correct because all fetch request
+            // deferred have a HWM less or equal to the previous leader's HWM.
+            fetchPurgatory.completeAll(currentTimeMs);
+
             // It is also possible that the high watermark is being updated
             // for the first time following the leader election, so we need
             // to give lagging listeners an opportunity to catch up as well
@@ -741,7 +745,10 @@ public final class KafkaRaftClient<T> implements 
RaftClient<T> {
 
     private void transitionToResigned(List<ReplicaKey> preferredSuccessors) {
         fetchPurgatory.completeAllExceptionally(
-            Errors.NOT_LEADER_OR_FOLLOWER.exception("Not handling request 
since this node is resigning"));
+            Errors.NOT_LEADER_OR_FOLLOWER.exception(
+                "Not handling request since this node is resigning"
+            )
+        );
         quorum.transitionToResigned(preferredSuccessors);
         resetConnections();
     }
@@ -753,12 +760,18 @@ public final class KafkaRaftClient<T> implements 
RaftClient<T> {
 
         // After becoming a follower, we need to complete all pending fetches 
so that
         // they can be re-sent to the leader without waiting for their 
expirations
-        fetchPurgatory.completeAllExceptionally(new 
NotLeaderOrFollowerException(
-            "Cannot process the fetch request because the node is no longer 
the leader."));
+        fetchPurgatory.completeAllExceptionally(
+            Errors.NOT_LEADER_OR_FOLLOWER.exception(
+                "Cannot process the fetch request because the node is no 
longer the leader"
+            )
+        );
 
         // Clearing the append purgatory should complete all futures 
exceptionally since this node is no longer the leader
-        appendPurgatory.completeAllExceptionally(new 
NotLeaderOrFollowerException(
-            "Failed to receive sufficient acknowledgments for this append 
before leader change."));
+        appendPurgatory.completeAllExceptionally(
+            Errors.NOT_LEADER_OR_FOLLOWER.exception(
+                "Failed to receive sufficient acknowledgments for this append 
before leader change"
+            )
+        );
     }
 
     private void transitionToFollower(
@@ -1514,19 +1527,22 @@ public final class KafkaRaftClient<T> implements 
RaftClient<T> {
             || FetchResponse.recordsSize(partitionResponse) > 0
             || request.maxWaitMs() == 0
             || isPartitionDiverged(partitionResponse)
-            || isPartitionSnapshotted(partitionResponse)) {
+            || isPartitionSnapshotted(partitionResponse)
+            || isHighWatermarkUpdated(partitionResponse, fetchPartition)) {
             // Reply immediately if any of the following is true
             // 1. The response contains an error
             // 2. There are records in the response
             // 3. The fetching replica doesn't want to wait for the partition 
to contain new data
             // 4. The fetching replica needs to truncate because the log 
diverged
             // 5. The fetching replica needs to fetch a snapshot
+            // 6. The fetching replica should update its high-watermark
             return completedFuture(response);
         }
 
         CompletableFuture<Long> future = fetchPurgatory.await(
             fetchPartition.fetchOffset(),
-            request.maxWaitMs());
+            request.maxWaitMs()
+        );
 
         return future.handle((completionTimeMs, exception) -> {
             if (exception != null) {
@@ -1556,26 +1572,25 @@ public final class KafkaRaftClient<T> implements 
RaftClient<T> {
                         Optional.empty()
                     );
                 }
-            }
-
-            // FIXME: `completionTimeMs`, which can be null
-            logger.trace(
-                "Completing delayed fetch from {} starting at offset {} at {}",
-                replicaKey,
-                fetchPartition.fetchOffset(),
-                completionTimeMs
-            );
+            } else {
+                logger.trace(
+                    "Completing delayed fetch from {} starting at offset {} at 
{}",
+                    replicaKey,
+                    fetchPartition.fetchOffset(),
+                    completionTimeMs
+                );
 
-            // It is safe to call tryCompleteFetchRequest because only the 
polling thread completes this
-            // future successfully. This is true because only the polling 
thread appends record batches to
-            // the log from maybeAppendBatches.
-            return tryCompleteFetchRequest(
-                requestMetadata.listenerName(),
-                requestMetadata.apiVersion(),
-                replicaKey,
-                fetchPartition,
-                time.milliseconds()
-            );
+                // It is safe to call tryCompleteFetchRequest because only the 
polling thread completes
+                // this future successfully. The future is completed 
successfully either because of an
+                // append (maybeAppendBatches) or because the HWM was updated 
(onUpdateLeaderHighWatermark)
+                return tryCompleteFetchRequest(
+                    requestMetadata.listenerName(),
+                    requestMetadata.apiVersion(),
+                    replicaKey,
+                    fetchPartition,
+                    completionTimeMs
+                );
+            }
         });
     }
 
@@ -1633,18 +1648,29 @@ public final class KafkaRaftClient<T> implements 
RaftClient<T> {
         }
     }
 
-    private static boolean isPartitionDiverged(FetchResponseData.PartitionData 
partitionResponseData) {
+    private static boolean isPartitionDiverged(
+        FetchResponseData.PartitionData partitionResponseData
+    ) {
         FetchResponseData.EpochEndOffset divergingEpoch = 
partitionResponseData.divergingEpoch();
 
         return divergingEpoch.epoch() != -1 || divergingEpoch.endOffset() != 
-1;
     }
 
-    private static boolean 
isPartitionSnapshotted(FetchResponseData.PartitionData partitionResponseData) {
+    private static boolean isPartitionSnapshotted(
+        FetchResponseData.PartitionData partitionResponseData
+    ) {
         FetchResponseData.SnapshotId snapshotId = 
partitionResponseData.snapshotId();
 
         return snapshotId.epoch() != -1 || snapshotId.endOffset() != -1;
     }
 
+    private static boolean isHighWatermarkUpdated(
+        FetchResponseData.PartitionData partitionResponseData,
+        FetchRequestData.FetchPartition partitionRequestData
+    ) {
+        return partitionRequestData.highWatermark() < 
partitionResponseData.highWatermark();
+    }
+
     private static OptionalInt optionalLeaderId(int leaderIdOrNil) {
         if (leaderIdOrNil < 0)
             return OptionalInt.empty();
@@ -2882,6 +2908,7 @@ public final class KafkaRaftClient<T> implements 
RaftClient<T> {
                 .setLastFetchedEpoch(log.lastFetchedEpoch())
                 .setFetchOffset(log.endOffset().offset())
                 .setReplicaDirectoryId(quorum.localDirectoryId())
+                
.setHighWatermark(quorum.highWatermark().map(LogOffsetMetadata::offset).orElse(-1L))
         );
 
         return request
diff --git 
a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientClusterAuthTest.java 
b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientClusterAuthTest.java
index 62c8e117698..5fa6a0781a8 100644
--- 
a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientClusterAuthTest.java
+++ 
b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientClusterAuthTest.java
@@ -53,7 +53,7 @@ public class KafkaRaftClientClusterAuthTest {
 
         context.pollUntilRequest();
 
-        RaftRequest.Outbound request = context.assertSentFetchRequest(epoch, 
0, 0);
+        RaftRequest.Outbound request = context.assertSentFetchRequest(epoch, 
0, 0, context.client.highWatermark());
         FetchResponseData response = new FetchResponseData()
                 .setErrorCode(Errors.CLUSTER_AUTHORIZATION_FAILED.code());
         context.deliverResponse(
diff --git 
a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientFetchTest.java 
b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientFetchTest.java
index ade509d8051..cfbc03070bb 100644
--- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientFetchTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientFetchTest.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.raft;
 
 import org.apache.kafka.common.compress.Compression;
+import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.record.ArbitraryMemoryRecords;
 import org.apache.kafka.common.record.InvalidMemoryRecordsProvider;
@@ -33,7 +34,10 @@ import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.ArgumentsSource;
 
 import java.nio.ByteBuffer;
+import java.util.List;
 import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.OptionalLong;
 import java.util.stream.IntStream;
 import java.util.stream.Stream;
 
@@ -72,7 +76,7 @@ public final class KafkaRaftClientFetchTest {
 
         context.pollUntilRequest();
         RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
-        context.assertFetchRequestData(fetchRequest, epoch, 0L, 0);
+        context.assertFetchRequestData(fetchRequest, epoch, 0L, 0, 
OptionalLong.empty());
 
         long oldLogEndOffset = context.log.endOffset().offset();
 
@@ -107,7 +111,7 @@ public final class KafkaRaftClientFetchTest {
 
         context.pollUntilRequest();
         RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
-        context.assertFetchRequestData(fetchRequest, epoch, 0L, 0);
+        context.assertFetchRequestData(fetchRequest, epoch, 0L, 0, 
OptionalLong.empty());
 
         long oldLogEndOffset = context.log.endOffset().offset();
         int numberOfRecords = 10;
@@ -149,4 +153,327 @@ public final class KafkaRaftClientFetchTest {
         // Check that only the first batch was appended because the second 
batch has a greater epoch
         assertEquals(oldLogEndOffset + numberOfRecords, 
context.log.endOffset().offset());
     }
+
+    @Test
+    void testHighWatermarkSentInFetchRequest() throws Exception {
+        int epoch = 2;
+        int localId = KafkaRaftClientTest.randomReplicaId();
+        ReplicaKey local = KafkaRaftClientTest.replicaKey(localId, true);
+        ReplicaKey electedLeader = KafkaRaftClientTest.replicaKey(localId + 1, 
true);
+
+        RaftClientTestContext context = new RaftClientTestContext.Builder(
+            local.id(),
+            local.directoryId().get()
+        )
+            .appendToLog(epoch, List.of("a", "b", "c"))
+            .appendToLog(epoch, List.of("d", "e", "f"))
+            .withStartingVoters(
+                VoterSetTest.voterSet(Stream.of(local, electedLeader)), 
KRaftVersion.KRAFT_VERSION_1
+            )
+            .withElectedLeader(epoch, electedLeader.id())
+            
.withRaftProtocol(RaftClientTestContext.RaftProtocol.KIP_1166_PROTOCOL)
+            .build();
+
+        var localLogEndOffset = context.log.endOffset().offset();
+
+        context.pollUntilRequest();
+        RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
+        context.assertFetchRequestData(
+            fetchRequest,
+            epoch,
+            localLogEndOffset,
+            epoch,
+            OptionalLong.empty()
+        );
+
+        // Set the HWM to the LEO
+        context.deliverResponse(
+            fetchRequest.correlationId(),
+            fetchRequest.destination(),
+            context.fetchResponse(
+                epoch,
+                electedLeader.id(),
+                MemoryRecords.EMPTY,
+                localLogEndOffset,
+                Errors.NONE
+            )
+        );
+
+        context.pollUntilRequest();
+        fetchRequest = context.assertSentFetchRequest();
+        context.assertFetchRequestData(
+            fetchRequest,
+            epoch,
+            localLogEndOffset,
+            epoch,
+            OptionalLong.of(localLogEndOffset)
+        );
+    }
+
+    @Test
+    void testDefaultHwmDeferred() throws Exception {
+        var epoch = 2;
+        var local = KafkaRaftClientTest.replicaKey(
+            KafkaRaftClientTest.randomReplicaId(),
+            true
+        );
+        var voter = KafkaRaftClientTest.replicaKey(local.id() + 1, true);
+        var remote = KafkaRaftClientTest.replicaKey(local.id() + 2, true);
+
+        RaftClientTestContext context = new RaftClientTestContext.Builder(
+            local.id(),
+            local.directoryId().get()
+        )
+            .appendToLog(epoch, List.of("a", "b", "c"))
+            .appendToLog(epoch, List.of("d", "e", "f"))
+            .withStartingVoters(
+                VoterSetTest.voterSet(Stream.of(local, voter)), 
KRaftVersion.KRAFT_VERSION_1
+            )
+            .withUnknownLeader(epoch)
+            
.withRaftProtocol(RaftClientTestContext.RaftProtocol.KIP_1166_PROTOCOL)
+            .build();
+
+        context.unattachedToLeader();
+        epoch = context.currentEpoch();
+
+        context.advanceLocalLeaderHighWatermarkToLogEndOffset();
+
+        var localLogEndOffset = context.log.endOffset().offset();
+        var lastFetchedEpoch = context.log.lastFetchedEpoch();
+        context.deliverRequest(
+            context.fetchRequest(
+                epoch,
+                remote,
+                localLogEndOffset,
+                lastFetchedEpoch,
+                Integer.MAX_VALUE
+            )
+        );
+
+        // Check that the fetch response was deferred
+        for (var i = 0; i < 10; ++i) {
+            context.client.poll();
+            assertEquals(List.of(), context.drainSentResponses(ApiKeys.FETCH));
+        }
+    }
+
+    @Test
+    void testUnknownHwmDeferredWhenLeaderDoesNotKnowHwm() throws Exception {
+        var epoch = 2;
+        var local = KafkaRaftClientTest.replicaKey(
+            KafkaRaftClientTest.randomReplicaId(),
+            true
+        );
+        var voter = KafkaRaftClientTest.replicaKey(local.id() + 1, true);
+        var remote = KafkaRaftClientTest.replicaKey(local.id() + 2, true);
+
+        RaftClientTestContext context = new RaftClientTestContext.Builder(
+            local.id(),
+            local.directoryId().get()
+        )
+            .appendToLog(epoch, List.of("a", "b", "c"))
+            .appendToLog(epoch, List.of("d", "e", "f"))
+            .withStartingVoters(
+                VoterSetTest.voterSet(Stream.of(local, voter)), 
KRaftVersion.KRAFT_VERSION_1
+            )
+            .withUnknownLeader(epoch)
+            
.withRaftProtocol(RaftClientTestContext.RaftProtocol.KIP_1166_PROTOCOL)
+            .build();
+
+        context.unattachedToLeader();
+        epoch = context.currentEpoch();
+
+        var localLogEndOffset = context.log.endOffset().offset();
+        var lastFetchedEpoch = context.log.lastFetchedEpoch();
+        context.deliverRequest(
+            context.fetchRequest(
+                epoch,
+                remote,
+                localLogEndOffset,
+                lastFetchedEpoch,
+                OptionalLong.empty(),
+                Integer.MAX_VALUE
+            )
+        );
+
+        // Check that the fetch response was deferred
+        for (var i = 0; i < 10; ++i) {
+            context.client.poll();
+            assertEquals(List.of(), context.drainSentResponses(ApiKeys.FETCH));
+        }
+    }
+
+    @Test
+    void testOutdatedHwmCompletedWhenLeaderKnowsHwm() throws Exception {
+        var epoch = 2;
+        var local = KafkaRaftClientTest.replicaKey(
+            KafkaRaftClientTest.randomReplicaId(),
+            true
+        );
+        var voter = KafkaRaftClientTest.replicaKey(local.id() + 1, true);
+        var remote = KafkaRaftClientTest.replicaKey(local.id() + 2, true);
+
+        RaftClientTestContext context = new RaftClientTestContext.Builder(
+            local.id(),
+            local.directoryId().get()
+        )
+            .appendToLog(epoch, List.of("a", "b", "c"))
+            .appendToLog(epoch, List.of("d", "e", "f"))
+            .withStartingVoters(
+                VoterSetTest.voterSet(Stream.of(local, voter)), 
KRaftVersion.KRAFT_VERSION_1
+            )
+            .withUnknownLeader(epoch)
+            
.withRaftProtocol(RaftClientTestContext.RaftProtocol.KIP_1166_PROTOCOL)
+            .build();
+
+        context.unattachedToLeader();
+        epoch = context.currentEpoch();
+
+        context.advanceLocalLeaderHighWatermarkToLogEndOffset();
+
+        var localLogEndOffset = context.log.endOffset().offset();
+        var lastFetchedEpoch = context.log.lastFetchedEpoch();
+
+        // FETCH response completed when remote replica doesn't know HWM
+        context.deliverRequest(
+            context.fetchRequest(
+                epoch,
+                remote,
+                localLogEndOffset,
+                lastFetchedEpoch,
+                OptionalLong.empty(),
+                Integer.MAX_VALUE
+            )
+        );
+        context.pollUntilResponse();
+        context.assertSentFetchPartitionResponse(localLogEndOffset, epoch);
+
+        // FETCH response completed when remote replica has outdated HWM
+        context.deliverRequest(
+            context.fetchRequest(
+                epoch,
+                remote,
+                localLogEndOffset,
+                lastFetchedEpoch,
+                OptionalLong.of(localLogEndOffset - 1),
+                Integer.MAX_VALUE
+            )
+        );
+        context.pollUntilResponse();
+        context.assertSentFetchPartitionResponse(localLogEndOffset, epoch);
+    }
+
+    @Test
+    void testUnchangedHighWatermarkDeferred() throws Exception {
+        var epoch = 2;
+        var local = KafkaRaftClientTest.replicaKey(
+            KafkaRaftClientTest.randomReplicaId(),
+            true
+        );
+        var voter = KafkaRaftClientTest.replicaKey(local.id() + 1, true);
+        var remote = KafkaRaftClientTest.replicaKey(local.id() + 2, true);
+
+        RaftClientTestContext context = new RaftClientTestContext.Builder(
+            local.id(),
+            local.directoryId().get()
+        )
+            .appendToLog(epoch, List.of("a", "b", "c"))
+            .appendToLog(epoch, List.of("d", "e", "f"))
+            .withStartingVoters(
+                VoterSetTest.voterSet(Stream.of(local, voter)), 
KRaftVersion.KRAFT_VERSION_1
+            )
+            .withUnknownLeader(epoch)
+            
.withRaftProtocol(RaftClientTestContext.RaftProtocol.KIP_1166_PROTOCOL)
+            .build();
+
+        context.unattachedToLeader();
+        epoch = context.currentEpoch();
+
+        context.advanceLocalLeaderHighWatermarkToLogEndOffset();
+
+        var localLogEndOffset = context.log.endOffset().offset();
+        var lastFetchedEpoch = context.log.lastFetchedEpoch();
+        context.deliverRequest(
+            context.fetchRequest(
+                epoch,
+                remote,
+                localLogEndOffset,
+                lastFetchedEpoch,
+                OptionalLong.of(localLogEndOffset),
+                Integer.MAX_VALUE
+            )
+        );
+
+        // Check that the fetch response was deferred
+        for (var i = 0; i < 10; ++i) {
+            context.client.poll();
+            assertEquals(List.of(), context.drainSentResponses(ApiKeys.FETCH));
+        }
+    }
+
+    @Test
+    void testUpdatedHighWatermarkCompleted() throws Exception {
+        var epoch = 2;
+        var local = KafkaRaftClientTest.replicaKey(
+            KafkaRaftClientTest.randomReplicaId(),
+            true
+        );
+        var voter = KafkaRaftClientTest.replicaKey(local.id() + 1, true);
+        var remote = KafkaRaftClientTest.replicaKey(local.id() + 2, true);
+
+        RaftClientTestContext context = new RaftClientTestContext.Builder(
+            local.id(),
+            local.directoryId().get()
+        )
+            .appendToLog(epoch, List.of("a", "b", "c"))
+            .appendToLog(epoch, List.of("d", "e", "f"))
+            .withStartingVoters(
+                VoterSetTest.voterSet(Stream.of(local, voter)), 
KRaftVersion.KRAFT_VERSION_1
+            )
+            .withUnknownLeader(epoch)
+            
.withRaftProtocol(RaftClientTestContext.RaftProtocol.KIP_1166_PROTOCOL)
+            .build();
+
+        context.unattachedToLeader();
+        epoch = context.currentEpoch();
+
+        // Establish a HWM (3) but don't set it to the LEO
+        context.deliverRequest(context.fetchRequest(epoch, voter, 3L, 2, 0));
+        context.pollUntilResponse();
+        context.assertSentFetchPartitionResponse(Errors.NONE, epoch, 
OptionalInt.of(local.id()));
+
+        var localLogEndOffset = context.log.endOffset().offset();
+        var lastFetchedEpoch = context.log.lastFetchedEpoch();
+        context.deliverRequest(
+            context.fetchRequest(
+                epoch,
+                remote,
+                localLogEndOffset,
+                lastFetchedEpoch,
+                OptionalLong.of(localLogEndOffset),
+                Integer.MAX_VALUE
+            )
+        );
+
+        // Check that the fetch response was deferred
+        for (var i = 0; i < 10; ++i) {
+            context.client.poll();
+            assertEquals(List.of(), context.drainSentResponses(ApiKeys.FETCH));
+        }
+
+        // Update the HWM and complete the deferred FETCH response
+        context.deliverRequest(
+            context.fetchRequest(epoch, voter, localLogEndOffset, 
lastFetchedEpoch, 0)
+        );
+        context.pollUntilResponse();
+
+        // Check that two fetch requests were completed
+        var fetchResponses = context.drainSentResponses(ApiKeys.FETCH);
+        for (var fetchResponse : fetchResponses) {
+            var partitionResponse = 
context.assertFetchResponseData(fetchResponse);
+            assertEquals(Errors.NONE, 
Errors.forCode(partitionResponse.errorCode()));
+            assertEquals(epoch, 
partitionResponse.currentLeader().leaderEpoch());
+            assertEquals(localLogEndOffset, partitionResponse.highWatermark());
+        }
+    }
 }
diff --git 
a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientPreVoteTest.java 
b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientPreVoteTest.java
index 4efe3e3600a..c3ecb2c20c6 100644
--- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientPreVoteTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientPreVoteTest.java
@@ -65,7 +65,7 @@ public class KafkaRaftClientPreVoteTest {
         if (hasFetchedFromLeader) {
             context.pollUntilRequest();
             RaftRequest.Outbound fetchRequest = 
context.assertSentFetchRequest();
-            context.assertFetchRequestData(fetchRequest, epoch, 0L, 0);
+            context.assertFetchRequestData(fetchRequest, epoch, 0L, 0, 
context.client.highWatermark());
 
             context.deliverResponse(
                 fetchRequest.correlationId(),
@@ -350,7 +350,7 @@ public class KafkaRaftClientPreVoteTest {
         if (hasFetchedFromLeader) {
             context.pollUntilRequest();
             RaftRequest.Outbound fetchRequest = 
context.assertSentFetchRequest();
-            context.assertFetchRequestData(fetchRequest, epoch, 0L, 0);
+            context.assertFetchRequestData(fetchRequest, epoch, 0L, 0, 
context.client.highWatermark());
 
             context.deliverResponse(
                 fetchRequest.correlationId(),
@@ -654,7 +654,7 @@ public class KafkaRaftClientPreVoteTest {
         // After fetching successfully from the leader once, follower will no 
longer grant PreVotes
         context.pollUntilRequest();
         RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
-        context.assertFetchRequestData(fetchRequest, epoch, 0L, 0);
+        context.assertFetchRequestData(fetchRequest, epoch, 0L, 0, 
context.client.highWatermark());
 
         context.deliverResponse(
             fetchRequest.correlationId(),
diff --git 
a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientReconfigTest.java 
b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientReconfigTest.java
index 0c797d3ada3..71c2ea4d98d 100644
--- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientReconfigTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientReconfigTest.java
@@ -229,7 +229,7 @@ public class KafkaRaftClientReconfigTest {
         // check that follower will send fetch request to leader
         context.pollUntilRequest();
         RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
-        context.assertFetchRequestData(fetchRequest, epoch, 0L, 0);
+        context.assertFetchRequestData(fetchRequest, epoch, 0L, 0, 
context.client.highWatermark());
 
         // check if leader response were to contain bootstrap snapshot id, 
follower would not send fetch snapshot request
         context.deliverResponse(
@@ -239,7 +239,7 @@ public class KafkaRaftClientReconfigTest {
         );
         context.pollUntilRequest();
         fetchRequest = context.assertSentFetchRequest();
-        context.assertFetchRequestData(fetchRequest, epoch, 0L, 0);
+        context.assertFetchRequestData(fetchRequest, epoch, 0L, 0, 
context.client.highWatermark());
     }
 
     @Test
@@ -259,7 +259,7 @@ public class KafkaRaftClientReconfigTest {
         // check that follower will send fetch request to leader
         context.pollUntilRequest();
         RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
-        context.assertFetchRequestData(fetchRequest, epoch, 0L, 0);
+        context.assertFetchRequestData(fetchRequest, epoch, 0L, 0, 
context.client.highWatermark());
 
         // check that before receiving bootstrap records from leader, follower 
is not in the voter set
         assertFalse(context.client.quorum().isVoter(follower));
@@ -2142,7 +2142,7 @@ public class KafkaRaftClientReconfigTest {
             context.time.sleep(context.fetchTimeoutMs - 1);
             context.pollUntilRequest();
             RaftRequest.Outbound fetchRequest = 
context.assertSentFetchRequest();
-            context.assertFetchRequestData(fetchRequest, epoch, 0L, 0);
+            context.assertFetchRequestData(fetchRequest, epoch, 0L, 0, 
context.client.highWatermark());
 
             context.deliverResponse(
                 fetchRequest.correlationId(),
@@ -2179,7 +2179,7 @@ public class KafkaRaftClientReconfigTest {
         // after sending an update voter the next request should be a fetch
         context.pollUntilRequest();
         RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
-        context.assertFetchRequestData(fetchRequest, epoch, 0L, 0);
+        context.assertFetchRequestData(fetchRequest, epoch, 0L, 0, 
context.client.highWatermark());
     }
 
     @ParameterizedTest
@@ -2215,7 +2215,7 @@ public class KafkaRaftClientReconfigTest {
             context.time.sleep(context.fetchTimeoutMs - 1);
             context.pollUntilRequest();
             RaftRequest.Outbound fetchRequest = 
context.assertSentFetchRequest();
-            context.assertFetchRequestData(fetchRequest, epoch, 0L, 0);
+            context.assertFetchRequestData(fetchRequest, epoch, 0L, 0, 
context.client.highWatermark());
 
             context.deliverResponse(
                 fetchRequest.correlationId(),
@@ -2255,7 +2255,7 @@ public class KafkaRaftClientReconfigTest {
             context.time.sleep(context.fetchTimeoutMs - 1);
             context.pollUntilRequest();
             RaftRequest.Outbound fetchRequest = 
context.assertSentFetchRequest();
-            context.assertFetchRequestData(fetchRequest, epoch, 0L, 0);
+            context.assertFetchRequestData(fetchRequest, epoch, 0L, 0, 
context.client.highWatermark());
 
             context.deliverResponse(
                 fetchRequest.correlationId(),
@@ -2306,7 +2306,7 @@ public class KafkaRaftClientReconfigTest {
             context.time.sleep(context.fetchTimeoutMs - 1);
             context.pollUntilRequest();
             RaftRequest.Outbound fetchRequest = 
context.assertSentFetchRequest();
-            context.assertFetchRequestData(fetchRequest, epoch, 0L, 0);
+            context.assertFetchRequestData(fetchRequest, epoch, 0L, 0, 
context.client.highWatermark());
 
             context.deliverResponse(
                 fetchRequest.correlationId(),
@@ -2342,7 +2342,7 @@ public class KafkaRaftClientReconfigTest {
 
         context.pollUntilRequest();
         RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
-        context.assertFetchRequestData(fetchRequest, epoch, 0L, 0);
+        context.assertFetchRequestData(fetchRequest, epoch, 0L, 0, 
context.client.highWatermark());
 
         // Election a new leader causes the replica to resend update voter 
request
         int newEpoch = epoch + 1;
@@ -2354,7 +2354,7 @@ public class KafkaRaftClientReconfigTest {
             context.time.sleep(context.fetchTimeoutMs - 1);
             context.pollUntilRequest();
             fetchRequest = context.assertSentFetchRequest();
-            context.assertFetchRequestData(fetchRequest, newEpoch, 0L, 0);
+            context.assertFetchRequestData(fetchRequest, newEpoch, 0L, 0, 
context.client.highWatermark());
 
             context.deliverResponse(
                 fetchRequest.correlationId(),
@@ -2390,7 +2390,7 @@ public class KafkaRaftClientReconfigTest {
 
         context.pollUntilRequest();
         fetchRequest = context.assertSentFetchRequest();
-        context.assertFetchRequestData(fetchRequest, newEpoch, 0L, 0);
+        context.assertFetchRequestData(fetchRequest, newEpoch, 0L, 0, 
context.client.highWatermark());
     }
 
     @Test
@@ -2640,7 +2640,7 @@ public class KafkaRaftClientReconfigTest {
             context.time.sleep(context.fetchTimeoutMs - 1);
             context.pollUntilRequest();
             RaftRequest.Outbound fetchRequest = 
context.assertSentFetchRequest();
-            context.assertFetchRequestData(fetchRequest, epoch, 0L, 0);
+            context.assertFetchRequestData(fetchRequest, epoch, 0L, 0, 
context.client.highWatermark());
 
             context.deliverResponse(
                 fetchRequest.correlationId(),
@@ -2661,7 +2661,7 @@ public class KafkaRaftClientReconfigTest {
         context.time.sleep(context.fetchTimeoutMs - 1);
         context.pollUntilRequest();
         RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
-        context.assertFetchRequestData(fetchRequest, epoch, 0L, 0);
+        context.assertFetchRequestData(fetchRequest, epoch, 0L, 0, 
context.client.highWatermark());
 
         // after more than 3 fetch timeouts the update voter period timer 
should have expired.
         // check that the update voter period timer doesn't remain at zero (0) 
and cause the message queue to get
@@ -2701,7 +2701,7 @@ public class KafkaRaftClientReconfigTest {
             context.time.sleep(context.fetchTimeoutMs - 1);
             context.pollUntilRequest();
             RaftRequest.Outbound fetchRequest = 
context.assertSentFetchRequest();
-            context.assertFetchRequestData(fetchRequest, epoch, 0L, 0);
+            context.assertFetchRequestData(fetchRequest, epoch, 0L, 0, 
context.client.highWatermark());
 
             context.deliverResponse(
                 fetchRequest.correlationId(),
@@ -2721,7 +2721,7 @@ public class KafkaRaftClientReconfigTest {
         // expect one last FETCH request
         context.pollUntilRequest();
         RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
-        context.assertFetchRequestData(fetchRequest, epoch, 0L, 0);
+        context.assertFetchRequestData(fetchRequest, epoch, 0L, 0, 
context.client.highWatermark());
 
         // don't send a response but increase the time
         context.time.sleep(context.requestTimeoutMs() - 1);
@@ -2781,7 +2781,7 @@ public class KafkaRaftClientReconfigTest {
             context.time.sleep(context.fetchTimeoutMs - 1);
             context.pollUntilRequest();
             RaftRequest.Outbound fetchRequest = 
context.assertSentFetchRequest();
-            context.assertFetchRequestData(fetchRequest, epoch, 0L, 0);
+            context.assertFetchRequestData(fetchRequest, epoch, 0L, 0, 
context.client.highWatermark());
 
             context.deliverResponse(
                 fetchRequest.correlationId(),
@@ -2818,7 +2818,7 @@ public class KafkaRaftClientReconfigTest {
         // check that there is a fetch to the new leader
         context.pollUntilRequest();
         RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
-        context.assertFetchRequestData(fetchRequest, epoch + 1, 0L, 0);
+        context.assertFetchRequestData(fetchRequest, epoch + 1, 0L, 0, 
context.client.highWatermark());
         assertEquals(voter2.id(), fetchRequest.destination().id());
     }
 
@@ -2837,7 +2837,7 @@ public class KafkaRaftClientReconfigTest {
 
         context.pollUntilRequest();
         RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
-        context.assertFetchRequestData(fetchRequest, epoch, 0L, 0);
+        context.assertFetchRequestData(fetchRequest, epoch, 0L, 0, 
context.client.highWatermark());
         assertEquals(-2, fetchRequest.destination().id());
     }
 
diff --git 
a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java 
b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java
index bc4be592a20..fd696458b80 100644
--- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java
@@ -155,7 +155,13 @@ public final class KafkaRaftClientSnapshotTest {
         long localLogEndOffset = context.log.endOffset().offset();
         context.pollUntilRequest();
         RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
-        context.assertFetchRequestData(fetchRequest, epoch, localLogEndOffset, 
snapshotId.epoch());
+        context.assertFetchRequestData(
+            fetchRequest,
+            epoch,
+            localLogEndOffset,
+            snapshotId.epoch(),
+            context.client.highWatermark()
+        );
         context.deliverResponse(
             fetchRequest.correlationId(),
             fetchRequest.destination(),
@@ -163,7 +169,12 @@ public final class KafkaRaftClientSnapshotTest {
         );
 
         context.pollUntilRequest();
-        context.assertSentFetchRequest(epoch, localLogEndOffset, 
snapshotId.epoch());
+        context.assertSentFetchRequest(
+            epoch,
+            localLogEndOffset,
+            snapshotId.epoch(),
+            context.client.highWatermark()
+        );
 
         // Check that listener was notified of the new snapshot
         try (SnapshotReader<String> snapshot = 
context.listener.drainHandledSnapshot().get()) {
@@ -197,7 +208,13 @@ public final class KafkaRaftClientSnapshotTest {
         long localLogEndOffset = context.log.endOffset().offset();
         context.pollUntilRequest();
         RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
-        context.assertFetchRequestData(fetchRequest, epoch, localLogEndOffset, 
snapshotId.epoch());
+        context.assertFetchRequestData(
+            fetchRequest,
+            epoch,
+            localLogEndOffset,
+            snapshotId.epoch(),
+            context.client.highWatermark()
+        );
         context.deliverResponse(
             fetchRequest.correlationId(),
             fetchRequest.destination(),
@@ -205,7 +222,12 @@ public final class KafkaRaftClientSnapshotTest {
         );
 
         context.pollUntilRequest();
-        context.assertSentFetchRequest(epoch, localLogEndOffset, 
snapshotId.epoch());
+        context.assertSentFetchRequest(
+            epoch,
+            localLogEndOffset,
+            snapshotId.epoch(),
+            context.client.highWatermark()
+        );
 
         RaftClientTestContext.MockListener secondListener = new 
RaftClientTestContext.MockListener(OptionalInt.of(localId));
         context.client.register(secondListener);
@@ -1145,7 +1167,7 @@ public final class KafkaRaftClientSnapshotTest {
 
         context.pollUntilRequest();
         RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
-        context.assertFetchRequestData(fetchRequest, epoch, 0L, 0);
+        context.assertFetchRequestData(fetchRequest, epoch, 0L, 0, 
context.client.highWatermark());
 
         context.deliverResponse(
             fetchRequest.correlationId(),
@@ -1162,7 +1184,7 @@ public final class KafkaRaftClientSnapshotTest {
 
         context.pollUntilRequest();
         fetchRequest = context.assertSentFetchRequest();
-        context.assertFetchRequestData(fetchRequest, epoch, 0L, 0);
+        context.assertFetchRequestData(fetchRequest, epoch, 0L, 0, 
context.client.highWatermark());
 
         context.deliverResponse(
             fetchRequest.correlationId(),
@@ -1179,7 +1201,7 @@ public final class KafkaRaftClientSnapshotTest {
 
         context.pollUntilRequest();
         fetchRequest = context.assertSentFetchRequest();
-        context.assertFetchRequestData(fetchRequest, epoch, 0L, 0);
+        context.assertFetchRequestData(fetchRequest, epoch, 0L, 0, 
context.client.highWatermark());
 
         // Fetch timer is not reset; sleeping for remainder should transition 
to prospective
         context.time.sleep(context.fetchTimeoutMs - slept);
@@ -1206,7 +1228,7 @@ public final class KafkaRaftClientSnapshotTest {
 
         context.pollUntilRequest();
         RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
-        context.assertFetchRequestData(fetchRequest, epoch, 0L, 0);
+        context.assertFetchRequestData(fetchRequest, epoch, 0L, 0, 
context.client.highWatermark());
 
         context.deliverResponse(
             fetchRequest.correlationId(),
@@ -1249,7 +1271,13 @@ public final class KafkaRaftClientSnapshotTest {
 
         context.pollUntilRequest();
         fetchRequest = context.assertSentFetchRequest();
-        context.assertFetchRequestData(fetchRequest, epoch, 
snapshotId.offset(), snapshotId.epoch());
+        context.assertFetchRequestData(
+            fetchRequest,
+            epoch,
+            snapshotId.offset(),
+            snapshotId.epoch(),
+            context.client.highWatermark()
+        );
 
         // Check that the snapshot was written to the log
         RawSnapshotReader snapshot = 
context.log.readSnapshot(snapshotId).get();
@@ -1279,7 +1307,7 @@ public final class KafkaRaftClientSnapshotTest {
 
         context.pollUntilRequest();
         RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
-        context.assertFetchRequestData(fetchRequest, epoch, 0L, 0);
+        context.assertFetchRequestData(fetchRequest, epoch, 0L, 0, 
context.client.highWatermark());
 
         context.deliverResponse(
             fetchRequest.correlationId(),
@@ -1354,7 +1382,13 @@ public final class KafkaRaftClientSnapshotTest {
 
         context.pollUntilRequest();
         fetchRequest = context.assertSentFetchRequest();
-        context.assertFetchRequestData(fetchRequest, epoch, 
snapshotId.offset(), snapshotId.epoch());
+        context.assertFetchRequestData(
+            fetchRequest,
+            epoch,
+            snapshotId.offset(),
+            snapshotId.epoch(),
+            context.client.highWatermark()
+        );
 
         // Check that the snapshot was written to the log
         RawSnapshotReader snapshot = 
context.log.readSnapshot(snapshotId).get();
@@ -1384,7 +1418,7 @@ public final class KafkaRaftClientSnapshotTest {
 
         context.pollUntilRequest();
         RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
-        context.assertFetchRequestData(fetchRequest, epoch, 0L, 0);
+        context.assertFetchRequestData(fetchRequest, epoch, 0L, 0, 
context.client.highWatermark());
 
         context.deliverResponse(
             fetchRequest.correlationId(),
@@ -1426,7 +1460,7 @@ public final class KafkaRaftClientSnapshotTest {
 
         context.pollUntilRequest();
         fetchRequest = context.assertSentFetchRequest();
-        context.assertFetchRequestData(fetchRequest, epoch, 0L, 0);
+        context.assertFetchRequestData(fetchRequest, epoch, 0L, 0, 
context.client.highWatermark());
     }
 
     @ParameterizedTest
@@ -1446,7 +1480,7 @@ public final class KafkaRaftClientSnapshotTest {
 
         context.pollUntilRequest();
         RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
-        context.assertFetchRequestData(fetchRequest, epoch, 0L, 0);
+        context.assertFetchRequestData(fetchRequest, epoch, 0L, 0, 
context.client.highWatermark());
 
         context.deliverResponse(
             fetchRequest.correlationId(),
@@ -1488,7 +1522,7 @@ public final class KafkaRaftClientSnapshotTest {
 
         context.pollUntilRequest();
         fetchRequest = context.assertSentFetchRequest();
-        context.assertFetchRequestData(fetchRequest, epoch + 1, 0L, 0);
+        context.assertFetchRequestData(fetchRequest, epoch + 1, 0L, 0, 
context.client.highWatermark());
     }
 
     @ParameterizedTest
@@ -1507,7 +1541,7 @@ public final class KafkaRaftClientSnapshotTest {
 
         context.pollUntilRequest();
         RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
-        context.assertFetchRequestData(fetchRequest, epoch, 0L, 0);
+        context.assertFetchRequestData(fetchRequest, epoch, 0L, 0, 
context.client.highWatermark());
 
         context.deliverResponse(
             fetchRequest.correlationId(),
@@ -1549,7 +1583,7 @@ public final class KafkaRaftClientSnapshotTest {
 
         context.pollUntilRequest();
         fetchRequest = context.assertSentFetchRequest();
-        context.assertFetchRequestData(fetchRequest, epoch + 1, 0L, 0);
+        context.assertFetchRequestData(fetchRequest, epoch + 1, 0L, 0, 
context.client.highWatermark());
     }
 
     @ParameterizedTest
@@ -1568,7 +1602,7 @@ public final class KafkaRaftClientSnapshotTest {
 
         context.pollUntilRequest();
         RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
-        context.assertFetchRequestData(fetchRequest, epoch, 0L, 0);
+        context.assertFetchRequestData(fetchRequest, epoch, 0L, 0, 
context.client.highWatermark());
 
         context.deliverResponse(
             fetchRequest.correlationId(),
@@ -1639,7 +1673,7 @@ public final class KafkaRaftClientSnapshotTest {
 
         context.pollUntilRequest();
         RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
-        context.assertFetchRequestData(fetchRequest, epoch, 0L, 0);
+        context.assertFetchRequestData(fetchRequest, epoch, 0L, 0, 
context.client.highWatermark());
 
         context.deliverResponse(
             fetchRequest.correlationId(),
@@ -1687,7 +1721,7 @@ public final class KafkaRaftClientSnapshotTest {
 
         // Follower should send a fetch request
         fetchRequest = context.assertSentFetchRequest();
-        context.assertFetchRequestData(fetchRequest, epoch, 0L, 0);
+        context.assertFetchRequestData(fetchRequest, epoch, 0L, 0, 
context.client.highWatermark());
 
         context.deliverResponse(
             fetchRequest.correlationId(),
@@ -1736,7 +1770,7 @@ public final class KafkaRaftClientSnapshotTest {
 
         // Follower should send a fetch request
         fetchRequest = context.assertSentFetchRequest();
-        context.assertFetchRequestData(fetchRequest, epoch, 0L, 0);
+        context.assertFetchRequestData(fetchRequest, epoch, 0L, 0, 
context.client.highWatermark());
     }
 
     @ParameterizedTest
@@ -1755,7 +1789,7 @@ public final class KafkaRaftClientSnapshotTest {
 
         context.pollUntilRequest();
         RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
-        context.assertFetchRequestData(fetchRequest, epoch, 0L, 0);
+        context.assertFetchRequestData(fetchRequest, epoch, 0L, 0, 
context.client.highWatermark());
 
         context.deliverResponse(
             fetchRequest.correlationId(),
@@ -2014,7 +2048,7 @@ public final class KafkaRaftClientSnapshotTest {
         context.pollUntilRequest();
         RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
         assertTrue(voters.contains(fetchRequest.destination().id()));
-        context.assertFetchRequestData(fetchRequest, epoch, 1L, 1);
+        context.assertFetchRequestData(fetchRequest, epoch, 1L, 1, 
context.client.highWatermark());
 
         // The response does not advance the high watermark
         List<String> records1 = List.of("b", "c");
@@ -2042,7 +2076,7 @@ public final class KafkaRaftClientSnapshotTest {
         context.pollUntilRequest();
         fetchRequest = context.assertSentFetchRequest();
         assertTrue(voters.contains(fetchRequest.destination().id()));
-        context.assertFetchRequestData(fetchRequest, epoch, 3L, 3);
+        context.assertFetchRequestData(fetchRequest, epoch, 3L, 3, 
context.client.highWatermark());
 
         List<String> records2 = List.of("d", "e", "f");
         int batch2Epoch = 4;
diff --git a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java 
b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
index 95cc3c3c4fb..eddb99608fb 100644
--- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
@@ -818,7 +818,7 @@ class KafkaRaftClientTest {
 
         RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
         assertTrue(voters.contains(fetchRequest.destination().id()));
-        context.assertFetchRequestData(fetchRequest, 0, 0L, 0);
+        context.assertFetchRequestData(fetchRequest, 0, 0L, 0, 
context.client.highWatermark());
 
         context.deliverResponse(
             fetchRequest.correlationId(),
@@ -863,7 +863,7 @@ class KafkaRaftClientTest {
 
         context.assertUnknownLeaderAndNoVotedCandidate(0);
         context.pollUntilRequest();
-        RaftRequest.Outbound request = context.assertSentFetchRequest(0, 0L, 
0);
+        RaftRequest.Outbound request = context.assertSentFetchRequest(0, 0L, 
0, OptionalLong.empty());
         assertTrue(context.client.quorum().isUnattached());
         assertTrue(context.client.quorum().isVoter());
 
@@ -1430,7 +1430,7 @@ class KafkaRaftClientTest {
         context.time.sleep(1);
 
         context.client.poll();
-        context.assertSentFetchRequest(leaderEpoch, 0, 0);
+        context.assertSentFetchRequest(leaderEpoch, 0, 0, 
OptionalLong.empty());
 
         context.time.sleep(context.electionBackoffMaxMs);
         context.client.poll();
@@ -1885,7 +1885,7 @@ class KafkaRaftClientTest {
 
         context.pollUntilRequest();
 
-        context.assertSentFetchRequest(epoch, 0L, 0);
+        context.assertSentFetchRequest(epoch, 0L, 0, OptionalLong.empty());
     }
 
     @ParameterizedTest
@@ -1906,7 +1906,7 @@ class KafkaRaftClientTest {
         context.assertElectedLeader(epoch, otherNodeId);
 
         context.pollUntilRequest();
-        context.assertSentFetchRequest(epoch, 1L, lastEpoch);
+        context.assertSentFetchRequest(epoch, 1L, lastEpoch, 
OptionalLong.empty());
     }
 
     @ParameterizedTest
@@ -1926,7 +1926,7 @@ class KafkaRaftClientTest {
         context.assertElectedLeader(epoch, otherNodeId);
 
         context.pollUntilRequest();
-        context.assertSentFetchRequest(epoch, 1L, lastEpoch);
+        context.assertSentFetchRequest(epoch, 1L, lastEpoch, 
OptionalLong.empty());
 
         context.time.sleep(context.fetchTimeoutMs);
         context.client.poll();
@@ -1952,7 +1952,7 @@ class KafkaRaftClientTest {
         context.assertElectedLeader(epoch, otherNodeId);
 
         context.pollUntilRequest();
-        context.assertSentFetchRequest(epoch, 1L, lastEpoch);
+        context.assertSentFetchRequest(epoch, 1L, lastEpoch, 
OptionalLong.empty());
 
         context.time.sleep(context.fetchTimeoutMs);
         context.pollUntilRequest();
@@ -1979,13 +1979,13 @@ class KafkaRaftClientTest {
             .build();
 
         context.pollUntilRequest();
-        context.assertSentFetchRequest(epoch, 0L, 0);
+        context.assertSentFetchRequest(epoch, 0L, 0, OptionalLong.empty());
         assertTrue(context.client.quorum().isUnattached());
 
         context.time.sleep(context.electionTimeoutMs() * 2);
         context.pollUntilRequest();
         assertTrue(context.client.quorum().isUnattached());
-        context.assertSentFetchRequest(epoch, 0L, 0);
+        context.assertSentFetchRequest(epoch, 0L, 0, OptionalLong.empty());
         // confirm no vote request was sent
         assertEquals(0, context.channel.drainSendQueue().size());
 
@@ -1998,7 +1998,7 @@ class KafkaRaftClientTest {
         context.pollUntilRequest();
         // observer cannot transition to prospective though
         assertTrue(context.client.quorum().isUnattached());
-        context.assertSentFetchRequest(epoch + 1, 0L, 0);
+        context.assertSentFetchRequest(epoch + 1, 0L, 0, OptionalLong.empty());
         assertEquals(0, context.channel.drainSendQueue().size());
     }
 
@@ -2017,7 +2017,7 @@ class KafkaRaftClientTest {
             .build();
 
         context.pollUntilRequest();
-        RaftRequest.Outbound request = context.assertSentFetchRequest(epoch, 
0L, 0);
+        RaftRequest.Outbound request = context.assertSentFetchRequest(epoch, 
0L, 0, OptionalLong.empty());
         assertTrue(context.client.quorum().isUnattached());
         assertTrue(context.client.quorum().isVoter());
 
@@ -2049,7 +2049,7 @@ class KafkaRaftClientTest {
         context.pollUntilRequest();
         RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
         assertTrue(voters.contains(fetchRequest.destination().id()));
-        context.assertFetchRequestData(fetchRequest, 0, 0L, 0);
+        context.assertFetchRequestData(fetchRequest, 0, 0L, 0, 
context.client.highWatermark());
 
         context.deliverResponse(
             fetchRequest.correlationId(),
@@ -2081,7 +2081,7 @@ class KafkaRaftClientTest {
         context.pollUntilRequest();
         RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
         
assertTrue(context.bootstrapIds.contains(fetchRequest.destination().id()));
-        context.assertFetchRequestData(fetchRequest, 0, 0L, 0);
+        context.assertFetchRequestData(fetchRequest, 0, 0L, 0, 
context.client.highWatermark());
 
         context.deliverResponse(
             fetchRequest.correlationId(),
@@ -2095,7 +2095,7 @@ class KafkaRaftClientTest {
 
         fetchRequest = context.assertSentFetchRequest();
         
assertTrue(context.bootstrapIds.contains(fetchRequest.destination().id()));
-        context.assertFetchRequestData(fetchRequest, 0, 0L, 0);
+        context.assertFetchRequestData(fetchRequest, 0, 0L, 0, 
context.client.highWatermark());
 
         context.deliverResponse(
             fetchRequest.correlationId(),
@@ -2128,7 +2128,7 @@ class KafkaRaftClientTest {
         context.pollUntilRequest();
         RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
         
assertTrue(context.bootstrapIds.contains(fetchRequest.destination().id()));
-        context.assertFetchRequestData(fetchRequest, 0, 0L, 0);
+        context.assertFetchRequestData(fetchRequest, 0, 0L, 0, 
context.client.highWatermark());
 
         context.deliverResponse(
             fetchRequest.correlationId(),
@@ -2145,7 +2145,7 @@ class KafkaRaftClientTest {
         fetchRequest = context.assertSentFetchRequest();
         assertNotEquals(leaderId, fetchRequest.destination().id());
         
assertTrue(context.bootstrapIds.contains(fetchRequest.destination().id()));
-        context.assertFetchRequestData(fetchRequest, epoch, 0L, 0);
+        context.assertFetchRequestData(fetchRequest, epoch, 0L, 0, 
context.client.highWatermark());
     }
 
     @ParameterizedTest
@@ -2173,7 +2173,7 @@ class KafkaRaftClientTest {
         RaftRequest.Outbound discoveryFetchRequest = 
context.assertSentFetchRequest();
         assertFalse(voters.contains(discoveryFetchRequest.destination().id()));
         
assertTrue(context.bootstrapIds.contains(discoveryFetchRequest.destination().id()));
-        context.assertFetchRequestData(discoveryFetchRequest, 0, 0L, 0);
+        context.assertFetchRequestData(discoveryFetchRequest, 0, 0L, 0, 
context.client.highWatermark());
 
         // Send a response with the leader and epoch
         context.deliverResponse(
@@ -2189,7 +2189,7 @@ class KafkaRaftClientTest {
         context.pollUntilRequest();
         RaftRequest.Outbound toLeaderFetchRequest = 
context.assertSentFetchRequest();
         assertEquals(leaderId, toLeaderFetchRequest.destination().id());
-        context.assertFetchRequestData(toLeaderFetchRequest, epoch, 0L, 0);
+        context.assertFetchRequestData(toLeaderFetchRequest, epoch, 0L, 0, 
context.client.highWatermark());
 
         context.time.sleep(context.requestTimeoutMs());
 
@@ -2198,7 +2198,7 @@ class KafkaRaftClientTest {
         RaftRequest.Outbound retryToBootstrapServerFetchRequest = 
context.assertSentFetchRequest();
         
assertFalse(voters.contains(retryToBootstrapServerFetchRequest.destination().id()));
         
assertTrue(context.bootstrapIds.contains(retryToBootstrapServerFetchRequest.destination().id()));
-        context.assertFetchRequestData(retryToBootstrapServerFetchRequest, 
epoch, 0L, 0);
+        context.assertFetchRequestData(retryToBootstrapServerFetchRequest, 
epoch, 0L, 0, context.client.highWatermark());
 
         // Deliver the delayed responses from the leader
         Records records = context.buildBatch(0L, 3, List.of("a", "b"));
@@ -2247,7 +2247,7 @@ class KafkaRaftClientTest {
         RaftRequest.Outbound discoveryFetchRequest = 
context.assertSentFetchRequest();
         assertFalse(voters.contains(discoveryFetchRequest.destination().id()));
         
assertTrue(context.bootstrapIds.contains(discoveryFetchRequest.destination().id()));
-        context.assertFetchRequestData(discoveryFetchRequest, 0, 0L, 0);
+        context.assertFetchRequestData(discoveryFetchRequest, 0, 0L, 0, 
context.client.highWatermark());
 
         // Send a response with the leader and epoch
         context.deliverResponse(
@@ -2263,7 +2263,7 @@ class KafkaRaftClientTest {
         context.pollUntilRequest();
         RaftRequest.Outbound toLeaderFetchRequest = 
context.assertSentFetchRequest();
         assertEquals(leaderId, toLeaderFetchRequest.destination().id());
-        context.assertFetchRequestData(toLeaderFetchRequest, epoch, 0L, 0);
+        context.assertFetchRequestData(toLeaderFetchRequest, epoch, 0L, 0, 
context.client.highWatermark());
 
         context.time.sleep(context.requestTimeoutMs());
 
@@ -2272,7 +2272,7 @@ class KafkaRaftClientTest {
         RaftRequest.Outbound retryToBootstrapServerFetchRequest = 
context.assertSentFetchRequest();
         
assertFalse(voters.contains(retryToBootstrapServerFetchRequest.destination().id()));
         
assertTrue(context.bootstrapIds.contains(retryToBootstrapServerFetchRequest.destination().id()));
-        context.assertFetchRequestData(retryToBootstrapServerFetchRequest, 
epoch, 0L, 0);
+        context.assertFetchRequestData(retryToBootstrapServerFetchRequest, 
epoch, 0L, 0, context.client.highWatermark());
 
         // At this point toLeaderFetchRequest has timed out but 
retryToBootstrapServerFetchRequest
         // is still waiting for a response.
@@ -2378,17 +2378,23 @@ class KafkaRaftClientTest {
         context.assertSentFetchPartitionResponse(Errors.INVALID_REQUEST, 
epoch, OptionalInt.of(localId));
 
         // null cluster id is accepted
-        context.deliverRequest(context.fetchRequest(epoch, null, otherNodeKey, 
-5L, 0, 0));
+        context.deliverRequest(
+            context.fetchRequest(epoch, null, otherNodeKey, -5L, 0, 
OptionalLong.of(Long.MAX_VALUE), 0)
+        );
         context.pollUntilResponse();
         context.assertSentFetchPartitionResponse(Errors.INVALID_REQUEST, 
epoch, OptionalInt.of(localId));
 
         // empty cluster id is rejected
-        context.deliverRequest(context.fetchRequest(epoch, "", otherNodeKey, 
-5L, 0, 0));
+        context.deliverRequest(
+            context.fetchRequest(epoch, "", otherNodeKey, -5L, 0, 
OptionalLong.of(Long.MAX_VALUE), 0)
+        );
         context.pollUntilResponse();
         
context.assertSentFetchPartitionResponse(Errors.INCONSISTENT_CLUSTER_ID);
 
         // invalid cluster id is rejected
-        context.deliverRequest(context.fetchRequest(epoch, "invalid-uuid", 
otherNodeKey, -5L, 0, 0));
+        context.deliverRequest(
+            context.fetchRequest(epoch, "invalid-uuid", otherNodeKey, -5L, 0, 
OptionalLong.of(Long.MAX_VALUE), 0)
+        );
         context.pollUntilResponse();
         
context.assertSentFetchPartitionResponse(Errors.INCONSISTENT_CLUSTER_ID);
     }
@@ -2778,7 +2784,7 @@ class KafkaRaftClientTest {
 
         // Wait until we have a Fetch inflight to the leader
         context.pollUntilRequest();
-        RaftRequest.Outbound fetchRequest = 
context.assertSentFetchRequest(epoch, 0L, 0);
+        RaftRequest.Outbound fetchRequest = 
context.assertSentFetchRequest(epoch, 0L, 0, OptionalLong.empty());
 
         // Now await the fetch timeout and become prospective
         context.time.sleep(context.fetchTimeoutMs);
@@ -2818,7 +2824,7 @@ class KafkaRaftClientTest {
 
         // Wait until we have a Fetch inflight to the leader
         context.pollUntilRequest();
-        RaftRequest.Outbound fetchRequest = 
context.assertSentFetchRequest(epoch, 0L, 0);
+        RaftRequest.Outbound fetchRequest = 
context.assertSentFetchRequest(epoch, 0L, 0, OptionalLong.empty());
 
         // Now receive a BeginEpoch from `voter3`
         context.deliverRequest(context.beginEpochRequest(epoch + 1, voter3));
@@ -2915,7 +2921,7 @@ class KafkaRaftClientTest {
         context.pollUntilRequest();
         RaftRequest.Outbound fetchRequest1 = context.assertSentFetchRequest();
         assertEquals(leaderId, fetchRequest1.destination().id());
-        context.assertFetchRequestData(fetchRequest1, epoch, 0L, 0);
+        context.assertFetchRequestData(fetchRequest1, epoch, 0L, 0, 
context.client.highWatermark());
 
         context.deliverResponse(
             fetchRequest1.correlationId(),
@@ -2929,7 +2935,7 @@ class KafkaRaftClientTest {
         RaftRequest.Outbound fetchRequest2 = context.assertSentFetchRequest();
         assertNotEquals(leaderId, fetchRequest2.destination().id());
         
assertTrue(context.bootstrapIds.contains(fetchRequest2.destination().id()));
-        context.assertFetchRequestData(fetchRequest2, epoch, 0L, 0);
+        context.assertFetchRequestData(fetchRequest2, epoch, 0L, 0, 
context.client.highWatermark());
     }
 
     @ParameterizedTest
@@ -2954,7 +2960,7 @@ class KafkaRaftClientTest {
         context.pollUntilRequest();
         RaftRequest.Outbound fetchRequest1 = context.assertSentFetchRequest();
         assertEquals(leaderId, fetchRequest1.destination().id());
-        context.assertFetchRequestData(fetchRequest1, epoch, 0L, 0);
+        context.assertFetchRequestData(fetchRequest1, epoch, 0L, 0, 
context.client.highWatermark());
 
         context.time.sleep(context.requestTimeoutMs());
         context.pollUntilRequest();
@@ -2964,7 +2970,7 @@ class KafkaRaftClientTest {
         RaftRequest.Outbound fetchRequest2 = context.assertSentFetchRequest();
         assertNotEquals(leaderId, fetchRequest2.destination().id());
         
assertTrue(context.bootstrapIds.contains(fetchRequest2.destination().id()));
-        context.assertFetchRequestData(fetchRequest2, epoch, 0L, 0);
+        context.assertFetchRequestData(fetchRequest2, epoch, 0L, 0, 
context.client.highWatermark());
     }
 
     @ParameterizedTest
@@ -2985,12 +2991,12 @@ class KafkaRaftClientTest {
             .withKip853Rpc(withKip853Rpc)
             .build();
 
-        context.discoverLeaderAsObserver(leaderId, epoch);
+        context.discoverLeaderAsObserver(leaderId, epoch, 
context.client.highWatermark());
 
         context.pollUntilRequest();
         RaftRequest.Outbound fetchRequest1 = context.assertSentFetchRequest();
         assertEquals(leaderId, fetchRequest1.destination().id());
-        context.assertFetchRequestData(fetchRequest1, epoch, 0L, 0);
+        context.assertFetchRequestData(fetchRequest1, epoch, 0L, 0, 
context.client.highWatermark());
 
         context.deliverResponse(
             fetchRequest1.correlationId(),
@@ -3004,7 +3010,7 @@ class KafkaRaftClientTest {
         RaftRequest.Outbound fetchRequest2 = context.assertSentFetchRequest();
         assertNotEquals(leaderId, fetchRequest2.destination().id());
         
assertTrue(context.bootstrapIds.contains(fetchRequest2.destination().id()));
-        context.assertFetchRequestData(fetchRequest2, epoch, 0L, 0);
+        context.assertFetchRequestData(fetchRequest2, epoch, 0L, 0, 
context.client.highWatermark());
 
         context.deliverResponse(
             fetchRequest2.correlationId(),
@@ -3034,12 +3040,12 @@ class KafkaRaftClientTest {
             .withKip853Rpc(withKip853Rpc)
             .build();
 
-        context.discoverLeaderAsObserver(leaderId, epoch);
+        context.discoverLeaderAsObserver(leaderId, epoch, 
context.client.highWatermark());
 
         context.pollUntilRequest();
         RaftRequest.Outbound fetchRequest1 = context.assertSentFetchRequest();
         assertEquals(leaderId, fetchRequest1.destination().id());
-        context.assertFetchRequestData(fetchRequest1, epoch, 0L, 0);
+        context.assertFetchRequestData(fetchRequest1, epoch, 0L, 0, 
context.client.highWatermark());
 
         context.time.sleep(context.requestTimeoutMs());
         context.pollUntilRequest();
@@ -3049,7 +3055,7 @@ class KafkaRaftClientTest {
         RaftRequest.Outbound fetchRequest2 = context.assertSentFetchRequest();
         assertNotEquals(leaderId, fetchRequest2.destination().id());
         
assertTrue(context.bootstrapIds.contains(fetchRequest2.destination().id()));
-        context.assertFetchRequestData(fetchRequest2, epoch, 0L, 0);
+        context.assertFetchRequestData(fetchRequest2, epoch, 0L, 0, 
context.client.highWatermark());
 
         context.deliverResponse(
             fetchRequest2.correlationId(),
@@ -3727,7 +3733,7 @@ class KafkaRaftClientTest {
 
         context.pollUntilRequest();
 
-        RaftRequest.Outbound fetchQuorumRequest = 
context.assertSentFetchRequest(epoch, 0L, 0);
+        RaftRequest.Outbound fetchQuorumRequest = 
context.assertSentFetchRequest(epoch, 0L, 0, OptionalLong.empty());
         Records records = context.buildBatch(0L, 3, List.of("a", "b"));
         FetchResponseData response = context.fetchResponse(epoch, otherNodeId, 
records, 0L, Errors.NONE);
         context.deliverResponse(
@@ -3758,7 +3764,7 @@ class KafkaRaftClientTest {
 
         context.pollUntilRequest();
 
-        RaftRequest.Outbound fetchQuorumRequest = 
context.assertSentFetchRequest(epoch, 0L, 0);
+        RaftRequest.Outbound fetchQuorumRequest = 
context.assertSentFetchRequest(epoch, 0L, 0, OptionalLong.empty());
         Records records = context.buildBatch(0L, 3, List.of("a", "b"));
         FetchResponseData response = context.fetchResponse(epoch, otherNodeId, 
records, 0L, Errors.NONE);
         context.deliverResponse(
@@ -3789,7 +3795,7 @@ class KafkaRaftClientTest {
 
         // Receive an empty fetch response
         context.pollUntilRequest();
-        RaftRequest.Outbound fetchQuorumRequest = 
context.assertSentFetchRequest(epoch, 0L, 0);
+        RaftRequest.Outbound fetchQuorumRequest = 
context.assertSentFetchRequest(epoch, 0L, 0, OptionalLong.empty());
         FetchResponseData fetchResponse = context.fetchResponse(
             epoch,
             otherNodeId,
@@ -3809,7 +3815,7 @@ class KafkaRaftClientTest {
         // Receive some records in the next poll, but do not advance high 
watermark
         context.pollUntilRequest();
         Records records = context.buildBatch(0L, epoch, List.of("a", "b"));
-        fetchQuorumRequest = context.assertSentFetchRequest(epoch, 0L, 0);
+        fetchQuorumRequest = context.assertSentFetchRequest(epoch, 0L, 0, 
OptionalLong.of(0));
         fetchResponse = context.fetchResponse(epoch, otherNodeId, records, 0L, 
Errors.NONE);
         context.deliverResponse(
             fetchQuorumRequest.correlationId(),
@@ -3822,7 +3828,7 @@ class KafkaRaftClientTest {
 
         // The next fetch response is empty, but should still advance the high 
watermark
         context.pollUntilRequest();
-        fetchQuorumRequest = context.assertSentFetchRequest(epoch, 2L, epoch);
+        fetchQuorumRequest = context.assertSentFetchRequest(epoch, 2L, epoch, 
OptionalLong.of(0));
         fetchResponse = context.fetchResponse(
             epoch,
             otherNodeId,
@@ -3974,10 +3980,20 @@ class KafkaRaftClientTest {
 
         context.pollUntilRequest();
 
-        RaftRequest.Outbound request = context.assertSentFetchRequest(epoch, 
3L, lastEpoch);
+        RaftRequest.Outbound request = context.assertSentFetchRequest(
+            epoch,
+            3L,
+            lastEpoch,
+            OptionalLong.empty()
+        );
 
-        FetchResponseData response = context.divergingFetchResponse(epoch, 
otherNodeId, 2L,
-            lastEpoch, 1L);
+        FetchResponseData response = context.divergingFetchResponse(
+            epoch,
+            otherNodeId,
+            2L,
+            lastEpoch,
+            1L
+        );
         context.deliverResponse(request.correlationId(), 
request.destination(), response);
 
         // Poll again to complete truncation
@@ -3987,7 +4003,7 @@ class KafkaRaftClientTest {
 
         // Now we should be fetching
         context.client.poll();
-        context.assertSentFetchRequest(epoch, 2L, lastEpoch);
+        context.assertSentFetchRequest(epoch, 2L, lastEpoch, 
context.client.highWatermark());
     }
 
     @ParameterizedTest
@@ -4255,7 +4271,7 @@ class KafkaRaftClientTest {
         context.pollUntilRequest();
         RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
         assertTrue(voters.contains(fetchRequest.destination().id()));
-        context.assertFetchRequestData(fetchRequest, epoch, 0L, 0);
+        context.assertFetchRequestData(fetchRequest, epoch, 0L, 0, 
context.client.highWatermark());
 
         // The response does not advance the high watermark
         List<String> records1 = List.of("a", "b", "c");
@@ -4276,7 +4292,7 @@ class KafkaRaftClientTest {
         context.pollUntilRequest();
         fetchRequest = context.assertSentFetchRequest();
         assertTrue(voters.contains(fetchRequest.destination().id()));
-        context.assertFetchRequestData(fetchRequest, epoch, 3L, 3);
+        context.assertFetchRequestData(fetchRequest, epoch, 3L, 3, 
context.client.highWatermark());
 
         // The high watermark advances to include the first batch we fetched
         List<String> records2 = List.of("d", "e", "f");
@@ -4514,7 +4530,7 @@ class KafkaRaftClientTest {
         context.pollUntilRequest();
         RaftRequest.Outbound fetchRequest1 = context.assertSentFetchRequest();
         
assertTrue(context.bootstrapIds.contains(fetchRequest1.destination().id()));
-        context.assertFetchRequestData(fetchRequest1, 0, 0L, 0);
+        context.assertFetchRequestData(fetchRequest1, 0, 0L, 0, 
context.client.highWatermark());
 
         int leaderEpoch = 5;
 
@@ -4531,7 +4547,7 @@ class KafkaRaftClientTest {
         context.pollUntilRequest();
         RaftRequest.Outbound fetchRequest2 = context.assertSentFetchRequest();
         assertEquals(leaderId, fetchRequest2.destination().id());
-        context.assertFetchRequestData(fetchRequest2, leaderEpoch, 0L, 0);
+        context.assertFetchRequestData(fetchRequest2, leaderEpoch, 0L, 0, 
context.client.highWatermark());
 
         List<String> records = List.of("a", "b", "c");
         MemoryRecords batch1 = context.buildBatch(0L, 3, records);
diff --git 
a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java 
b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
index 9f0a5084326..b84c603ac9f 100644
--- a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
+++ b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
@@ -957,7 +957,7 @@ public final class RaftClientTestContext {
         return requests;
     }
 
-    private List<RaftResponse.Outbound> drainSentResponses(
+    List<RaftResponse.Outbound> drainSentResponses(
         ApiKeys apiKey
     ) {
         List<RaftResponse.Outbound> res = new ArrayList<>();
@@ -1114,23 +1114,20 @@ public final class RaftClientTestContext {
     RaftRequest.Outbound assertSentFetchRequest(
         int epoch,
         long fetchOffset,
-        int lastFetchedEpoch
+        int lastFetchedEpoch,
+        OptionalLong highWatermark
     ) {
         List<RaftRequest.Outbound> sentMessages = channel.drainSendQueue();
         assertEquals(1, sentMessages.size());
 
         RaftRequest.Outbound raftRequest = sentMessages.get(0);
-        assertFetchRequestData(raftRequest, epoch, fetchOffset, 
lastFetchedEpoch);
+        assertFetchRequestData(raftRequest, epoch, fetchOffset, 
lastFetchedEpoch, highWatermark);
         return raftRequest;
     }
 
-    FetchResponseData.PartitionData assertSentFetchPartitionResponse() {
-        List<RaftResponse.Outbound> sentMessages = 
drainSentResponses(ApiKeys.FETCH);
-        assertEquals(
-            1, sentMessages.size(), "Found unexpected sent messages " + 
sentMessages);
-        RaftResponse.Outbound raftMessage = sentMessages.get(0);
-        assertEquals(ApiKeys.FETCH.id, raftMessage.data().apiKey());
-        FetchResponseData response = (FetchResponseData) raftMessage.data();
+    FetchResponseData.PartitionData 
assertFetchResponseData(RaftResponse.Outbound message) {
+        assertEquals(ApiKeys.FETCH.id, message.data().apiKey());
+        FetchResponseData response = (FetchResponseData) message.data();
         assertEquals(Errors.NONE, Errors.forCode(response.errorCode()));
 
         assertEquals(1, response.responses().size());
@@ -1152,17 +1149,30 @@ public final class RaftClientTestContext {
         return partitionResponse;
     }
 
+    FetchResponseData.PartitionData assertSentFetchPartitionResponse() {
+        List<RaftResponse.Outbound> sentMessages = 
drainSentResponses(ApiKeys.FETCH);
+        assertEquals(
+            1,
+            sentMessages.size(),
+            "Found unexpected sent messages " + sentMessages
+        );
+
+        return assertFetchResponseData(sentMessages.get(0));
+    }
+
     void assertSentFetchPartitionResponse(Errors topLevelError) {
         List<RaftResponse.Outbound> sentMessages = 
drainSentResponses(ApiKeys.FETCH);
         assertEquals(
-            1, sentMessages.size(), "Found unexpected sent messages " + 
sentMessages);
+            1,
+            sentMessages.size(),
+            "Found unexpected sent messages " + sentMessages
+        );
         RaftResponse.Outbound raftMessage = sentMessages.get(0);
         assertEquals(ApiKeys.FETCH.id, raftMessage.data().apiKey());
         FetchResponseData response = (FetchResponseData) raftMessage.data();
         assertEquals(topLevelError, Errors.forCode(response.errorCode()));
     }
 
-
     MemoryRecords assertSentFetchPartitionResponse(
         Errors error,
         int epoch,
@@ -1375,7 +1385,8 @@ public final class RaftClientTestContext {
 
     void discoverLeaderAsObserver(
         int leaderId,
-        int epoch
+        int epoch,
+        OptionalLong highWatermark
     ) throws Exception {
         pollUntilRequest();
         RaftRequest.Outbound fetchRequest = assertSentFetchRequest();
@@ -1384,7 +1395,7 @@ public final class RaftClientTestContext {
             startingVoters.voterIds().contains(destinationId) || 
bootstrapIds.contains(destinationId),
             String.format("id %d is not in sets %s or %s", destinationId, 
startingVoters, bootstrapIds)
         );
-        assertFetchRequestData(fetchRequest, 0, 0L, 0);
+        assertFetchRequestData(fetchRequest, 0, 0L, 0, highWatermark);
 
         deliverResponse(
             fetchRequest.correlationId(),
@@ -1672,7 +1683,8 @@ public final class RaftClientTestContext {
         RaftRequest.Outbound message,
         int epoch,
         long fetchOffset,
-        int lastFetchedEpoch
+        int lastFetchedEpoch,
+        OptionalLong highWatermark
     ) {
         assertInstanceOf(
             FetchRequestData.class,
@@ -1691,6 +1703,7 @@ public final class RaftClientTestContext {
         assertEquals(fetchOffset, fetchPartition.fetchOffset());
         assertEquals(lastFetchedEpoch, fetchPartition.lastFetchedEpoch());
         assertEquals(localId.orElse(-1), request.replicaState().replicaId());
+        assertEquals(highWatermark.orElse(-1), fetchPartition.highWatermark());
 
         // Assert that voters have flushed up to the fetch offset
         if ((localId.isPresent() && 
startingVoters.voterIds().contains(localId.getAsInt())) ||
@@ -1716,6 +1729,24 @@ public final class RaftClientTestContext {
         long fetchOffset,
         int lastFetchedEpoch,
         int maxWaitTimeMs
+    ) {
+        return fetchRequest(
+            epoch,
+            replicaKey,
+            fetchOffset,
+            lastFetchedEpoch,
+            OptionalLong.of(Long.MAX_VALUE),
+            maxWaitTimeMs
+        );
+    }
+
+    FetchRequestData fetchRequest(
+        int epoch,
+        ReplicaKey replicaKey,
+        long fetchOffset,
+        int lastFetchedEpoch,
+        OptionalLong highWatermark,
+        int maxWaitTimeMs
     ) {
         return fetchRequest(
             epoch,
@@ -1723,6 +1754,7 @@ public final class RaftClientTestContext {
             replicaKey,
             fetchOffset,
             lastFetchedEpoch,
+            highWatermark,
             maxWaitTimeMs
         );
     }
@@ -1733,6 +1765,7 @@ public final class RaftClientTestContext {
         ReplicaKey replicaKey,
         long fetchOffset,
         int lastFetchedEpoch,
+        OptionalLong highWatermark,
         int maxWaitTimeMs
     ) {
         FetchRequestData request = RaftUtil.singletonFetchRequest(
@@ -1742,7 +1775,8 @@ public final class RaftClientTestContext {
                 fetchPartition
                     .setCurrentLeaderEpoch(epoch)
                     .setLastFetchedEpoch(lastFetchedEpoch)
-                    .setFetchOffset(fetchOffset);
+                    .setFetchOffset(fetchOffset)
+                    .setHighWatermark(highWatermark.orElse(-1));
                 if (raftProtocol.isReconfigSupported()) {
                     fetchPartition
                         
.setReplicaDirectoryId(replicaKey.directoryId().orElse(ReplicaKey.NO_DIRECTORY_ID));
@@ -1932,7 +1966,9 @@ public final class RaftClientTestContext {
     }
 
     private short fetchRpcVersion() {
-        if (raftProtocol.isReconfigSupported()) {
+        if (raftProtocol.isHwmInFetchSupported()) {
+            return 18;
+        } else if (raftProtocol.isReconfigSupported()) {
             return 17;
         } else {
             return 16;
@@ -2236,7 +2272,9 @@ public final class RaftClientTestContext {
         // dynamic quorum reconfiguration support
         KIP_853_PROTOCOL,
         // preVote support
-        KIP_996_PROTOCOL;
+        KIP_996_PROTOCOL,
+        // HWM in FETCH request support
+        KIP_1166_PROTOCOL;
 
         boolean isKRaftSupported() {
             return isAtLeast(KIP_595_PROTOCOL);
@@ -2250,6 +2288,10 @@ public final class RaftClientTestContext {
             return isAtLeast(KIP_996_PROTOCOL);
         }
 
+        boolean isHwmInFetchSupported() {
+            return isAtLeast(KIP_1166_PROTOCOL);
+        }
+
         private boolean isAtLeast(RaftProtocol otherRpc) {
             return this.compareTo(otherRpc) >= 0;
         }
diff --git 
a/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
 
b/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
index 9e9ed6f8447..f05db6d187a 100644
--- 
a/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
+++ 
b/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
@@ -116,6 +116,9 @@ public enum MetadataVersion {
     // Streams groups are early access in 4.1 (KIP-1071).
     IBP_4_1_IV0(26, "4.1", "IV0", false),
 
+    // Send FETCH version 18 in the replica fetcher (KIP-1166)
+    IBP_4_1_IV1(27, "4.1", "IV1", false),
+
     // Insert any additional IBP_4_1_IVx versions above this comment, and bump 
the feature level of
     // IBP_4_2_IVx accordingly. When 4.2 development begins, IBP_4_2_IV0 will 
cease to be
     // a placeholder.
@@ -126,7 +129,7 @@ public enum MetadataVersion {
     // *** SHARE GROUPS BECOME PRODUCTION-READY IN THE FUTURE. ITS DEFINITION 
ALLOWS A SHARE   ***
     // *** GROUPS FEATURE TO BE DEFINED IN 4.1 BUT TURNED OFF BY DEFAULT, ABLE 
TO BE TURNED ON ***
     // *** DYNAMICALLY TO TRY OUT THE PREVIEW CAPABILITY.                      
                ***
-    IBP_4_2_IV0(27, "4.2", "IV0", false),
+    IBP_4_2_IV0(28, "4.2", "IV0", false),
 
     // Enables "streams" groups by default for new clusters (KIP-1071).
     //
@@ -134,7 +137,7 @@ public enum MetadataVersion {
     // *** STREAMS GROUPS BECOME PRODUCTION-READY IN THE FUTURE. ITS 
DEFINITION ALLOWS A STREAMS ***
     // *** GROUPS FEATURE TO BE DEFINED IN 4.1 BUT TURNED OFF BY DEFAULT, ABLE 
TO BE TURNED ON   ***
     // *** DYNAMICALLY TO TRY OUT THE EARLY ACCESS CAPABILITY.                 
                  ***
-    IBP_4_2_IV1(28, "4.2", "IV1", false);
+    IBP_4_2_IV1(29, "4.2", "IV1", false);
 
     // NOTES when adding a new version:
     //   Update the default version in @ClusterTest annotation to point to the 
latest version
@@ -264,13 +267,15 @@ public enum MetadataVersion {
     }
 
     public short fetchRequestVersion() {
-        if (this.isAtLeast(IBP_3_9_IV0)) {
+        if (isAtLeast(IBP_4_1_IV1)) {
+            return 18;
+        } else if (isAtLeast(IBP_3_9_IV0)) {
             return 17;
-        } else if (this.isAtLeast(IBP_3_7_IV4)) {
+        } else if (isAtLeast(IBP_3_7_IV4)) {
             return 16;
-        } else if (this.isAtLeast(IBP_3_5_IV1)) {
+        } else if (isAtLeast(IBP_3_5_IV1)) {
             return 15;
-        } else if (this.isAtLeast(IBP_3_5_IV0)) {
+        } else if (isAtLeast(IBP_3_5_IV0)) {
             return 14;
         } else {
             return 13;
diff --git a/tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java 
b/tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java
index c416cfb67d5..23794fde622 100644
--- a/tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java
+++ b/tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java
@@ -121,8 +121,16 @@ public class FeatureCommandTest {
                         "disable", "--feature", "metadata.version"))
         );
         // Change expected message to reflect possible MetadataVersion range 
1-N (N increases when adding a new version)
-        assertEquals("Could not disable metadata.version. The update failed 
for all features since the following " +
-                "feature had an error: Invalid update version 0 for feature 
metadata.version. Local controller 3000 only supports versions 7-28", 
commandOutput);
+        assertEquals(
+            String.format(
+                "Could not disable metadata.version. The update failed for all 
features since the " +
+                "following feature had an error: Invalid update version 0 for 
feature " +
+                "metadata.version. Local controller 3000 only supports 
versions %s-%s",
+                MetadataVersion.MINIMUM_VERSION.featureLevel(),
+                MetadataVersion.latestTesting().featureLevel()
+            ),
+            commandOutput
+        );
 
         commandOutput = ToolsTestUtils.captureStandardOut(() ->
                 assertEquals(1, 
FeatureCommand.mainNoExit("--bootstrap-server", cluster.bootstrapServers(),

Reply via email to