This is an automated email from the ASF dual-hosted git repository.
jsancio pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 742b327025f KAFKA-14145; Faster KRaft HWM replication (#19800)
742b327025f is described below
commit 742b327025f102770054756f99e38b692efd42d4
Author: José Armando García Sancio <[email protected]>
AuthorDate: Tue Jun 17 13:00:43 2025 -0400
KAFKA-14145; Faster KRaft HWM replication (#19800)
This change compares the remote replica's HWM with the leader's HWM and
completes the FETCH request if the remote HWM is less than the leader's
HWM. When the leader's HWM is updated any pending FETCH RPC is
completed.
Reviewers: Alyssa Huang <[email protected]>, David Arthur
<[email protected]>, Andrew Schofield <[email protected]>
---
.../resources/common/message/FetchRequest.json | 9 +-
.../resources/common/message/FetchResponse.json | 4 +-
.../controller/FeatureControlManagerTest.java | 26 +-
.../org/apache/kafka/raft/KafkaRaftClient.java | 85 ++++--
.../kafka/raft/KafkaRaftClientClusterAuthTest.java | 2 +-
.../kafka/raft/KafkaRaftClientFetchTest.java | 331 ++++++++++++++++++++-
.../kafka/raft/KafkaRaftClientPreVoteTest.java | 6 +-
.../kafka/raft/KafkaRaftClientReconfigTest.java | 36 +--
.../kafka/raft/KafkaRaftClientSnapshotTest.java | 82 +++--
.../org/apache/kafka/raft/KafkaRaftClientTest.java | 116 ++++----
.../apache/kafka/raft/RaftClientTestContext.java | 78 +++--
.../kafka/server/common/MetadataVersion.java | 17 +-
.../org/apache/kafka/tools/FeatureCommandTest.java | 12 +-
13 files changed, 641 insertions(+), 163 deletions(-)
diff --git a/clients/src/main/resources/common/message/FetchRequest.json
b/clients/src/main/resources/common/message/FetchRequest.json
index b7ad185f60b..b4dd880faed 100644
--- a/clients/src/main/resources/common/message/FetchRequest.json
+++ b/clients/src/main/resources/common/message/FetchRequest.json
@@ -56,7 +56,9 @@
// Version 16 is the same as version 15 (KIP-951).
//
// Version 17 adds directory id support from KIP-853
- "validVersions": "4-17",
+ //
+ // Version 18 adds high-watermark from KIP-1166
+ "validVersions": "4-18",
"flexibleVersions": "12+",
"fields": [
{ "name": "ClusterId", "type": "string", "versions": "12+",
"nullableVersions": "12+", "default": "null",
@@ -103,7 +105,10 @@
{ "name": "PartitionMaxBytes", "type": "int32", "versions": "0+",
"about": "The maximum bytes to fetch from this partition. See
KIP-74 for cases where this limit may not be honored." },
{ "name": "ReplicaDirectoryId", "type": "uuid", "versions": "17+",
"taggedVersions": "17+", "tag": 0, "ignorable": true,
- "about": "The directory id of the follower fetching." }
+ "about": "The directory id of the follower fetching." },
+ { "name": "HighWatermark", "type": "int64", "versions": "18+",
"default": "9223372036854775807", "taggedVersions": "18+",
+ "tag": 1, "ignorable": true,
+ "about": "The high-watermark known by the replica. -1 if the
high-watermark is not known and 9223372036854775807 if the feature is not
supported." }
]}
]},
{ "name": "ForgottenTopicsData", "type": "[]ForgottenTopic", "versions":
"7+", "ignorable": false,
diff --git a/clients/src/main/resources/common/message/FetchResponse.json
b/clients/src/main/resources/common/message/FetchResponse.json
index dc8d3517566..36dc05ff60c 100644
--- a/clients/src/main/resources/common/message/FetchResponse.json
+++ b/clients/src/main/resources/common/message/FetchResponse.json
@@ -48,7 +48,9 @@
// Version 16 adds the 'NodeEndpoints' field (KIP-951).
//
// Version 17 no changes to the response (KIP-853).
- "validVersions": "4-17",
+ //
+ // Version 18 no changes to the response (KIP-1166)
+ "validVersions": "4-18",
"flexibleVersions": "12+",
"fields": [
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "1+",
"ignorable": true,
diff --git
a/metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java
b/metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java
index 5ff9cff626d..774aff96510 100644
---
a/metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java
+++
b/metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java
@@ -394,13 +394,25 @@ public class FeatureControlManagerTest {
MetadataVersion.MINIMUM_VERSION.featureLevel(),
MetadataVersion.latestTesting().featureLevel())).
build();
manager.replay(new
FeatureLevelRecord().setName(MetadataVersion.FEATURE_NAME).setFeatureLevel(MetadataVersion.MINIMUM_VERSION.featureLevel()));
- assertEquals(ControllerResult.of(List.of(), new
ApiError(Errors.INVALID_UPDATE_VERSION,
- "Invalid update version 6 for feature metadata.version. Local
controller 0 only supports versions 7-28")),
- manager.updateFeatures(
- Map.of(MetadataVersion.FEATURE_NAME,
MetadataVersionTestUtils.IBP_3_3_IV2_FEATURE_LEVEL),
- Map.of(MetadataVersion.FEATURE_NAME,
FeatureUpdate.UpgradeType.UNSAFE_DOWNGRADE),
- true,
- 0));
+ assertEquals(
+ ControllerResult.of(
+ List.of(),
+ new ApiError(
+ Errors.INVALID_UPDATE_VERSION,
+ String.format(
+ "Invalid update version 6 for feature
metadata.version. Local controller 0 only supports versions %s-%s",
+ MetadataVersion.MINIMUM_VERSION.featureLevel(),
+ MetadataVersion.latestTesting().featureLevel()
+ )
+ )
+ ),
+ manager.updateFeatures(
+ Map.of(MetadataVersion.FEATURE_NAME,
MetadataVersionTestUtils.IBP_3_3_IV2_FEATURE_LEVEL),
+ Map.of(MetadataVersion.FEATURE_NAME,
FeatureUpdate.UpgradeType.UNSAFE_DOWNGRADE),
+ true,
+ 0
+ )
+ );
}
@Test
diff --git a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
index 760d6b8c159..970ee2d6910 100644
--- a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
+++ b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
@@ -25,7 +25,6 @@ import org.apache.kafka.common.compress.Compression;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.errors.ClusterAuthorizationException;
import org.apache.kafka.common.errors.CorruptRecordException;
-import org.apache.kafka.common.errors.NotLeaderOrFollowerException;
import org.apache.kafka.common.feature.SupportedVersionRange;
import org.apache.kafka.common.memory.MemoryPool;
import org.apache.kafka.common.message.AddRaftVoterRequestData;
@@ -386,6 +385,11 @@ public final class KafkaRaftClient<T> implements
RaftClient<T> {
// records still held in memory directly to the listener
appendPurgatory.maybeComplete(highWatermark.offset(),
currentTimeMs);
+ // After updating the high-watermark, complete all of the deferred
+ // fetch requests. This is always correct because all fetch request
+ // deferred have a HWM less or equal to the previous leader's HWM.
+ fetchPurgatory.completeAll(currentTimeMs);
+
// It is also possible that the high watermark is being updated
// for the first time following the leader election, so we need
// to give lagging listeners an opportunity to catch up as well
@@ -741,7 +745,10 @@ public final class KafkaRaftClient<T> implements
RaftClient<T> {
private void transitionToResigned(List<ReplicaKey> preferredSuccessors) {
fetchPurgatory.completeAllExceptionally(
- Errors.NOT_LEADER_OR_FOLLOWER.exception("Not handling request
since this node is resigning"));
+ Errors.NOT_LEADER_OR_FOLLOWER.exception(
+ "Not handling request since this node is resigning"
+ )
+ );
quorum.transitionToResigned(preferredSuccessors);
resetConnections();
}
@@ -753,12 +760,18 @@ public final class KafkaRaftClient<T> implements
RaftClient<T> {
// After becoming a follower, we need to complete all pending fetches
so that
// they can be re-sent to the leader without waiting for their
expirations
- fetchPurgatory.completeAllExceptionally(new
NotLeaderOrFollowerException(
- "Cannot process the fetch request because the node is no longer
the leader."));
+ fetchPurgatory.completeAllExceptionally(
+ Errors.NOT_LEADER_OR_FOLLOWER.exception(
+ "Cannot process the fetch request because the node is no
longer the leader"
+ )
+ );
// Clearing the append purgatory should complete all futures
exceptionally since this node is no longer the leader
- appendPurgatory.completeAllExceptionally(new
NotLeaderOrFollowerException(
- "Failed to receive sufficient acknowledgments for this append
before leader change."));
+ appendPurgatory.completeAllExceptionally(
+ Errors.NOT_LEADER_OR_FOLLOWER.exception(
+ "Failed to receive sufficient acknowledgments for this append
before leader change"
+ )
+ );
}
private void transitionToFollower(
@@ -1514,19 +1527,22 @@ public final class KafkaRaftClient<T> implements
RaftClient<T> {
|| FetchResponse.recordsSize(partitionResponse) > 0
|| request.maxWaitMs() == 0
|| isPartitionDiverged(partitionResponse)
- || isPartitionSnapshotted(partitionResponse)) {
+ || isPartitionSnapshotted(partitionResponse)
+ || isHighWatermarkUpdated(partitionResponse, fetchPartition)) {
// Reply immediately if any of the following is true
// 1. The response contains an error
// 2. There are records in the response
// 3. The fetching replica doesn't want to wait for the partition
to contain new data
// 4. The fetching replica needs to truncate because the log
diverged
// 5. The fetching replica needs to fetch a snapshot
+ // 6. The fetching replica should update its high-watermark
return completedFuture(response);
}
CompletableFuture<Long> future = fetchPurgatory.await(
fetchPartition.fetchOffset(),
- request.maxWaitMs());
+ request.maxWaitMs()
+ );
return future.handle((completionTimeMs, exception) -> {
if (exception != null) {
@@ -1556,26 +1572,25 @@ public final class KafkaRaftClient<T> implements
RaftClient<T> {
Optional.empty()
);
}
- }
-
- // FIXME: `completionTimeMs`, which can be null
- logger.trace(
- "Completing delayed fetch from {} starting at offset {} at {}",
- replicaKey,
- fetchPartition.fetchOffset(),
- completionTimeMs
- );
+ } else {
+ logger.trace(
+ "Completing delayed fetch from {} starting at offset {} at
{}",
+ replicaKey,
+ fetchPartition.fetchOffset(),
+ completionTimeMs
+ );
- // It is safe to call tryCompleteFetchRequest because only the
polling thread completes this
- // future successfully. This is true because only the polling
thread appends record batches to
- // the log from maybeAppendBatches.
- return tryCompleteFetchRequest(
- requestMetadata.listenerName(),
- requestMetadata.apiVersion(),
- replicaKey,
- fetchPartition,
- time.milliseconds()
- );
+ // It is safe to call tryCompleteFetchRequest because only the
polling thread completes
+ // this future successfully. The future is completed
successfully either because of an
+ // append (maybeAppendBatches) or because the HWM was updated
(onUpdateLeaderHighWatermark)
+ return tryCompleteFetchRequest(
+ requestMetadata.listenerName(),
+ requestMetadata.apiVersion(),
+ replicaKey,
+ fetchPartition,
+ completionTimeMs
+ );
+ }
});
}
@@ -1633,18 +1648,29 @@ public final class KafkaRaftClient<T> implements
RaftClient<T> {
}
}
- private static boolean isPartitionDiverged(FetchResponseData.PartitionData
partitionResponseData) {
+ private static boolean isPartitionDiverged(
+ FetchResponseData.PartitionData partitionResponseData
+ ) {
FetchResponseData.EpochEndOffset divergingEpoch =
partitionResponseData.divergingEpoch();
return divergingEpoch.epoch() != -1 || divergingEpoch.endOffset() !=
-1;
}
- private static boolean
isPartitionSnapshotted(FetchResponseData.PartitionData partitionResponseData) {
+ private static boolean isPartitionSnapshotted(
+ FetchResponseData.PartitionData partitionResponseData
+ ) {
FetchResponseData.SnapshotId snapshotId =
partitionResponseData.snapshotId();
return snapshotId.epoch() != -1 || snapshotId.endOffset() != -1;
}
+ private static boolean isHighWatermarkUpdated(
+ FetchResponseData.PartitionData partitionResponseData,
+ FetchRequestData.FetchPartition partitionRequestData
+ ) {
+ return partitionRequestData.highWatermark() <
partitionResponseData.highWatermark();
+ }
+
private static OptionalInt optionalLeaderId(int leaderIdOrNil) {
if (leaderIdOrNil < 0)
return OptionalInt.empty();
@@ -2882,6 +2908,7 @@ public final class KafkaRaftClient<T> implements
RaftClient<T> {
.setLastFetchedEpoch(log.lastFetchedEpoch())
.setFetchOffset(log.endOffset().offset())
.setReplicaDirectoryId(quorum.localDirectoryId())
+
.setHighWatermark(quorum.highWatermark().map(LogOffsetMetadata::offset).orElse(-1L))
);
return request
diff --git
a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientClusterAuthTest.java
b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientClusterAuthTest.java
index 62c8e117698..5fa6a0781a8 100644
---
a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientClusterAuthTest.java
+++
b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientClusterAuthTest.java
@@ -53,7 +53,7 @@ public class KafkaRaftClientClusterAuthTest {
context.pollUntilRequest();
- RaftRequest.Outbound request = context.assertSentFetchRequest(epoch,
0, 0);
+ RaftRequest.Outbound request = context.assertSentFetchRequest(epoch,
0, 0, context.client.highWatermark());
FetchResponseData response = new FetchResponseData()
.setErrorCode(Errors.CLUSTER_AUTHORIZATION_FAILED.code());
context.deliverResponse(
diff --git
a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientFetchTest.java
b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientFetchTest.java
index ade509d8051..cfbc03070bb 100644
--- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientFetchTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientFetchTest.java
@@ -17,6 +17,7 @@
package org.apache.kafka.raft;
import org.apache.kafka.common.compress.Compression;
+import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.ArbitraryMemoryRecords;
import org.apache.kafka.common.record.InvalidMemoryRecordsProvider;
@@ -33,7 +34,10 @@ import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ArgumentsSource;
import java.nio.ByteBuffer;
+import java.util.List;
import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.OptionalLong;
import java.util.stream.IntStream;
import java.util.stream.Stream;
@@ -72,7 +76,7 @@ public final class KafkaRaftClientFetchTest {
context.pollUntilRequest();
RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
- context.assertFetchRequestData(fetchRequest, epoch, 0L, 0);
+ context.assertFetchRequestData(fetchRequest, epoch, 0L, 0,
OptionalLong.empty());
long oldLogEndOffset = context.log.endOffset().offset();
@@ -107,7 +111,7 @@ public final class KafkaRaftClientFetchTest {
context.pollUntilRequest();
RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
- context.assertFetchRequestData(fetchRequest, epoch, 0L, 0);
+ context.assertFetchRequestData(fetchRequest, epoch, 0L, 0,
OptionalLong.empty());
long oldLogEndOffset = context.log.endOffset().offset();
int numberOfRecords = 10;
@@ -149,4 +153,327 @@ public final class KafkaRaftClientFetchTest {
// Check that only the first batch was appended because the second
batch has a greater epoch
assertEquals(oldLogEndOffset + numberOfRecords,
context.log.endOffset().offset());
}
+
+ @Test
+ void testHighWatermarkSentInFetchRequest() throws Exception {
+ int epoch = 2;
+ int localId = KafkaRaftClientTest.randomReplicaId();
+ ReplicaKey local = KafkaRaftClientTest.replicaKey(localId, true);
+ ReplicaKey electedLeader = KafkaRaftClientTest.replicaKey(localId + 1,
true);
+
+ RaftClientTestContext context = new RaftClientTestContext.Builder(
+ local.id(),
+ local.directoryId().get()
+ )
+ .appendToLog(epoch, List.of("a", "b", "c"))
+ .appendToLog(epoch, List.of("d", "e", "f"))
+ .withStartingVoters(
+ VoterSetTest.voterSet(Stream.of(local, electedLeader)),
KRaftVersion.KRAFT_VERSION_1
+ )
+ .withElectedLeader(epoch, electedLeader.id())
+
.withRaftProtocol(RaftClientTestContext.RaftProtocol.KIP_1166_PROTOCOL)
+ .build();
+
+ var localLogEndOffset = context.log.endOffset().offset();
+
+ context.pollUntilRequest();
+ RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
+ context.assertFetchRequestData(
+ fetchRequest,
+ epoch,
+ localLogEndOffset,
+ epoch,
+ OptionalLong.empty()
+ );
+
+ // Set the HWM to the LEO
+ context.deliverResponse(
+ fetchRequest.correlationId(),
+ fetchRequest.destination(),
+ context.fetchResponse(
+ epoch,
+ electedLeader.id(),
+ MemoryRecords.EMPTY,
+ localLogEndOffset,
+ Errors.NONE
+ )
+ );
+
+ context.pollUntilRequest();
+ fetchRequest = context.assertSentFetchRequest();
+ context.assertFetchRequestData(
+ fetchRequest,
+ epoch,
+ localLogEndOffset,
+ epoch,
+ OptionalLong.of(localLogEndOffset)
+ );
+ }
+
+ @Test
+ void testDefaultHwmDeferred() throws Exception {
+ var epoch = 2;
+ var local = KafkaRaftClientTest.replicaKey(
+ KafkaRaftClientTest.randomReplicaId(),
+ true
+ );
+ var voter = KafkaRaftClientTest.replicaKey(local.id() + 1, true);
+ var remote = KafkaRaftClientTest.replicaKey(local.id() + 2, true);
+
+ RaftClientTestContext context = new RaftClientTestContext.Builder(
+ local.id(),
+ local.directoryId().get()
+ )
+ .appendToLog(epoch, List.of("a", "b", "c"))
+ .appendToLog(epoch, List.of("d", "e", "f"))
+ .withStartingVoters(
+ VoterSetTest.voterSet(Stream.of(local, voter)),
KRaftVersion.KRAFT_VERSION_1
+ )
+ .withUnknownLeader(epoch)
+
.withRaftProtocol(RaftClientTestContext.RaftProtocol.KIP_1166_PROTOCOL)
+ .build();
+
+ context.unattachedToLeader();
+ epoch = context.currentEpoch();
+
+ context.advanceLocalLeaderHighWatermarkToLogEndOffset();
+
+ var localLogEndOffset = context.log.endOffset().offset();
+ var lastFetchedEpoch = context.log.lastFetchedEpoch();
+ context.deliverRequest(
+ context.fetchRequest(
+ epoch,
+ remote,
+ localLogEndOffset,
+ lastFetchedEpoch,
+ Integer.MAX_VALUE
+ )
+ );
+
+ // Check that the fetch response was deferred
+ for (var i = 0; i < 10; ++i) {
+ context.client.poll();
+ assertEquals(List.of(), context.drainSentResponses(ApiKeys.FETCH));
+ }
+ }
+
+ @Test
+ void testUnknownHwmDeferredWhenLeaderDoesNotKnowHwm() throws Exception {
+ var epoch = 2;
+ var local = KafkaRaftClientTest.replicaKey(
+ KafkaRaftClientTest.randomReplicaId(),
+ true
+ );
+ var voter = KafkaRaftClientTest.replicaKey(local.id() + 1, true);
+ var remote = KafkaRaftClientTest.replicaKey(local.id() + 2, true);
+
+ RaftClientTestContext context = new RaftClientTestContext.Builder(
+ local.id(),
+ local.directoryId().get()
+ )
+ .appendToLog(epoch, List.of("a", "b", "c"))
+ .appendToLog(epoch, List.of("d", "e", "f"))
+ .withStartingVoters(
+ VoterSetTest.voterSet(Stream.of(local, voter)),
KRaftVersion.KRAFT_VERSION_1
+ )
+ .withUnknownLeader(epoch)
+
.withRaftProtocol(RaftClientTestContext.RaftProtocol.KIP_1166_PROTOCOL)
+ .build();
+
+ context.unattachedToLeader();
+ epoch = context.currentEpoch();
+
+ var localLogEndOffset = context.log.endOffset().offset();
+ var lastFetchedEpoch = context.log.lastFetchedEpoch();
+ context.deliverRequest(
+ context.fetchRequest(
+ epoch,
+ remote,
+ localLogEndOffset,
+ lastFetchedEpoch,
+ OptionalLong.empty(),
+ Integer.MAX_VALUE
+ )
+ );
+
+ // Check that the fetch response was deferred
+ for (var i = 0; i < 10; ++i) {
+ context.client.poll();
+ assertEquals(List.of(), context.drainSentResponses(ApiKeys.FETCH));
+ }
+ }
+
+ @Test
+ void testOutdatedHwmCompletedWhenLeaderKnowsHwm() throws Exception {
+ var epoch = 2;
+ var local = KafkaRaftClientTest.replicaKey(
+ KafkaRaftClientTest.randomReplicaId(),
+ true
+ );
+ var voter = KafkaRaftClientTest.replicaKey(local.id() + 1, true);
+ var remote = KafkaRaftClientTest.replicaKey(local.id() + 2, true);
+
+ RaftClientTestContext context = new RaftClientTestContext.Builder(
+ local.id(),
+ local.directoryId().get()
+ )
+ .appendToLog(epoch, List.of("a", "b", "c"))
+ .appendToLog(epoch, List.of("d", "e", "f"))
+ .withStartingVoters(
+ VoterSetTest.voterSet(Stream.of(local, voter)),
KRaftVersion.KRAFT_VERSION_1
+ )
+ .withUnknownLeader(epoch)
+
.withRaftProtocol(RaftClientTestContext.RaftProtocol.KIP_1166_PROTOCOL)
+ .build();
+
+ context.unattachedToLeader();
+ epoch = context.currentEpoch();
+
+ context.advanceLocalLeaderHighWatermarkToLogEndOffset();
+
+ var localLogEndOffset = context.log.endOffset().offset();
+ var lastFetchedEpoch = context.log.lastFetchedEpoch();
+
+ // FETCH response completed when remote replica doesn't know HWM
+ context.deliverRequest(
+ context.fetchRequest(
+ epoch,
+ remote,
+ localLogEndOffset,
+ lastFetchedEpoch,
+ OptionalLong.empty(),
+ Integer.MAX_VALUE
+ )
+ );
+ context.pollUntilResponse();
+ context.assertSentFetchPartitionResponse(localLogEndOffset, epoch);
+
+ // FETCH response completed when remote replica has outdated HWM
+ context.deliverRequest(
+ context.fetchRequest(
+ epoch,
+ remote,
+ localLogEndOffset,
+ lastFetchedEpoch,
+ OptionalLong.of(localLogEndOffset - 1),
+ Integer.MAX_VALUE
+ )
+ );
+ context.pollUntilResponse();
+ context.assertSentFetchPartitionResponse(localLogEndOffset, epoch);
+ }
+
+ @Test
+ void testUnchangedHighWatermarkDeferred() throws Exception {
+ var epoch = 2;
+ var local = KafkaRaftClientTest.replicaKey(
+ KafkaRaftClientTest.randomReplicaId(),
+ true
+ );
+ var voter = KafkaRaftClientTest.replicaKey(local.id() + 1, true);
+ var remote = KafkaRaftClientTest.replicaKey(local.id() + 2, true);
+
+ RaftClientTestContext context = new RaftClientTestContext.Builder(
+ local.id(),
+ local.directoryId().get()
+ )
+ .appendToLog(epoch, List.of("a", "b", "c"))
+ .appendToLog(epoch, List.of("d", "e", "f"))
+ .withStartingVoters(
+ VoterSetTest.voterSet(Stream.of(local, voter)),
KRaftVersion.KRAFT_VERSION_1
+ )
+ .withUnknownLeader(epoch)
+
.withRaftProtocol(RaftClientTestContext.RaftProtocol.KIP_1166_PROTOCOL)
+ .build();
+
+ context.unattachedToLeader();
+ epoch = context.currentEpoch();
+
+ context.advanceLocalLeaderHighWatermarkToLogEndOffset();
+
+ var localLogEndOffset = context.log.endOffset().offset();
+ var lastFetchedEpoch = context.log.lastFetchedEpoch();
+ context.deliverRequest(
+ context.fetchRequest(
+ epoch,
+ remote,
+ localLogEndOffset,
+ lastFetchedEpoch,
+ OptionalLong.of(localLogEndOffset),
+ Integer.MAX_VALUE
+ )
+ );
+
+ // Check that the fetch response was deferred
+ for (var i = 0; i < 10; ++i) {
+ context.client.poll();
+ assertEquals(List.of(), context.drainSentResponses(ApiKeys.FETCH));
+ }
+ }
+
+ @Test
+ void testUpdatedHighWatermarkCompleted() throws Exception {
+ var epoch = 2;
+ var local = KafkaRaftClientTest.replicaKey(
+ KafkaRaftClientTest.randomReplicaId(),
+ true
+ );
+ var voter = KafkaRaftClientTest.replicaKey(local.id() + 1, true);
+ var remote = KafkaRaftClientTest.replicaKey(local.id() + 2, true);
+
+ RaftClientTestContext context = new RaftClientTestContext.Builder(
+ local.id(),
+ local.directoryId().get()
+ )
+ .appendToLog(epoch, List.of("a", "b", "c"))
+ .appendToLog(epoch, List.of("d", "e", "f"))
+ .withStartingVoters(
+ VoterSetTest.voterSet(Stream.of(local, voter)),
KRaftVersion.KRAFT_VERSION_1
+ )
+ .withUnknownLeader(epoch)
+
.withRaftProtocol(RaftClientTestContext.RaftProtocol.KIP_1166_PROTOCOL)
+ .build();
+
+ context.unattachedToLeader();
+ epoch = context.currentEpoch();
+
+ // Establish a HWM (3) but don't set it to the LEO
+ context.deliverRequest(context.fetchRequest(epoch, voter, 3L, 2, 0));
+ context.pollUntilResponse();
+ context.assertSentFetchPartitionResponse(Errors.NONE, epoch,
OptionalInt.of(local.id()));
+
+ var localLogEndOffset = context.log.endOffset().offset();
+ var lastFetchedEpoch = context.log.lastFetchedEpoch();
+ context.deliverRequest(
+ context.fetchRequest(
+ epoch,
+ remote,
+ localLogEndOffset,
+ lastFetchedEpoch,
+ OptionalLong.of(localLogEndOffset),
+ Integer.MAX_VALUE
+ )
+ );
+
+ // Check that the fetch response was deferred
+ for (var i = 0; i < 10; ++i) {
+ context.client.poll();
+ assertEquals(List.of(), context.drainSentResponses(ApiKeys.FETCH));
+ }
+
+ // Update the HWM and complete the deferred FETCH response
+ context.deliverRequest(
+ context.fetchRequest(epoch, voter, localLogEndOffset,
lastFetchedEpoch, 0)
+ );
+ context.pollUntilResponse();
+
+ // Check that two fetch requests were completed
+ var fetchResponses = context.drainSentResponses(ApiKeys.FETCH);
+ for (var fetchResponse : fetchResponses) {
+ var partitionResponse =
context.assertFetchResponseData(fetchResponse);
+ assertEquals(Errors.NONE,
Errors.forCode(partitionResponse.errorCode()));
+ assertEquals(epoch,
partitionResponse.currentLeader().leaderEpoch());
+ assertEquals(localLogEndOffset, partitionResponse.highWatermark());
+ }
+ }
}
diff --git
a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientPreVoteTest.java
b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientPreVoteTest.java
index 4efe3e3600a..c3ecb2c20c6 100644
--- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientPreVoteTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientPreVoteTest.java
@@ -65,7 +65,7 @@ public class KafkaRaftClientPreVoteTest {
if (hasFetchedFromLeader) {
context.pollUntilRequest();
RaftRequest.Outbound fetchRequest =
context.assertSentFetchRequest();
- context.assertFetchRequestData(fetchRequest, epoch, 0L, 0);
+ context.assertFetchRequestData(fetchRequest, epoch, 0L, 0,
context.client.highWatermark());
context.deliverResponse(
fetchRequest.correlationId(),
@@ -350,7 +350,7 @@ public class KafkaRaftClientPreVoteTest {
if (hasFetchedFromLeader) {
context.pollUntilRequest();
RaftRequest.Outbound fetchRequest =
context.assertSentFetchRequest();
- context.assertFetchRequestData(fetchRequest, epoch, 0L, 0);
+ context.assertFetchRequestData(fetchRequest, epoch, 0L, 0,
context.client.highWatermark());
context.deliverResponse(
fetchRequest.correlationId(),
@@ -654,7 +654,7 @@ public class KafkaRaftClientPreVoteTest {
// After fetching successfully from the leader once, follower will no
longer grant PreVotes
context.pollUntilRequest();
RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
- context.assertFetchRequestData(fetchRequest, epoch, 0L, 0);
+ context.assertFetchRequestData(fetchRequest, epoch, 0L, 0,
context.client.highWatermark());
context.deliverResponse(
fetchRequest.correlationId(),
diff --git
a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientReconfigTest.java
b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientReconfigTest.java
index 0c797d3ada3..71c2ea4d98d 100644
--- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientReconfigTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientReconfigTest.java
@@ -229,7 +229,7 @@ public class KafkaRaftClientReconfigTest {
// check that follower will send fetch request to leader
context.pollUntilRequest();
RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
- context.assertFetchRequestData(fetchRequest, epoch, 0L, 0);
+ context.assertFetchRequestData(fetchRequest, epoch, 0L, 0,
context.client.highWatermark());
// check if leader response were to contain bootstrap snapshot id,
follower would not send fetch snapshot request
context.deliverResponse(
@@ -239,7 +239,7 @@ public class KafkaRaftClientReconfigTest {
);
context.pollUntilRequest();
fetchRequest = context.assertSentFetchRequest();
- context.assertFetchRequestData(fetchRequest, epoch, 0L, 0);
+ context.assertFetchRequestData(fetchRequest, epoch, 0L, 0,
context.client.highWatermark());
}
@Test
@@ -259,7 +259,7 @@ public class KafkaRaftClientReconfigTest {
// check that follower will send fetch request to leader
context.pollUntilRequest();
RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
- context.assertFetchRequestData(fetchRequest, epoch, 0L, 0);
+ context.assertFetchRequestData(fetchRequest, epoch, 0L, 0,
context.client.highWatermark());
// check that before receiving bootstrap records from leader, follower
is not in the voter set
assertFalse(context.client.quorum().isVoter(follower));
@@ -2142,7 +2142,7 @@ public class KafkaRaftClientReconfigTest {
context.time.sleep(context.fetchTimeoutMs - 1);
context.pollUntilRequest();
RaftRequest.Outbound fetchRequest =
context.assertSentFetchRequest();
- context.assertFetchRequestData(fetchRequest, epoch, 0L, 0);
+ context.assertFetchRequestData(fetchRequest, epoch, 0L, 0,
context.client.highWatermark());
context.deliverResponse(
fetchRequest.correlationId(),
@@ -2179,7 +2179,7 @@ public class KafkaRaftClientReconfigTest {
// after sending an update voter the next request should be a fetch
context.pollUntilRequest();
RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
- context.assertFetchRequestData(fetchRequest, epoch, 0L, 0);
+ context.assertFetchRequestData(fetchRequest, epoch, 0L, 0,
context.client.highWatermark());
}
@ParameterizedTest
@@ -2215,7 +2215,7 @@ public class KafkaRaftClientReconfigTest {
context.time.sleep(context.fetchTimeoutMs - 1);
context.pollUntilRequest();
RaftRequest.Outbound fetchRequest =
context.assertSentFetchRequest();
- context.assertFetchRequestData(fetchRequest, epoch, 0L, 0);
+ context.assertFetchRequestData(fetchRequest, epoch, 0L, 0,
context.client.highWatermark());
context.deliverResponse(
fetchRequest.correlationId(),
@@ -2255,7 +2255,7 @@ public class KafkaRaftClientReconfigTest {
context.time.sleep(context.fetchTimeoutMs - 1);
context.pollUntilRequest();
RaftRequest.Outbound fetchRequest =
context.assertSentFetchRequest();
- context.assertFetchRequestData(fetchRequest, epoch, 0L, 0);
+ context.assertFetchRequestData(fetchRequest, epoch, 0L, 0,
context.client.highWatermark());
context.deliverResponse(
fetchRequest.correlationId(),
@@ -2306,7 +2306,7 @@ public class KafkaRaftClientReconfigTest {
context.time.sleep(context.fetchTimeoutMs - 1);
context.pollUntilRequest();
RaftRequest.Outbound fetchRequest =
context.assertSentFetchRequest();
- context.assertFetchRequestData(fetchRequest, epoch, 0L, 0);
+ context.assertFetchRequestData(fetchRequest, epoch, 0L, 0,
context.client.highWatermark());
context.deliverResponse(
fetchRequest.correlationId(),
@@ -2342,7 +2342,7 @@ public class KafkaRaftClientReconfigTest {
context.pollUntilRequest();
RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
- context.assertFetchRequestData(fetchRequest, epoch, 0L, 0);
+ context.assertFetchRequestData(fetchRequest, epoch, 0L, 0,
context.client.highWatermark());
// Election a new leader causes the replica to resend update voter
request
int newEpoch = epoch + 1;
@@ -2354,7 +2354,7 @@ public class KafkaRaftClientReconfigTest {
context.time.sleep(context.fetchTimeoutMs - 1);
context.pollUntilRequest();
fetchRequest = context.assertSentFetchRequest();
- context.assertFetchRequestData(fetchRequest, newEpoch, 0L, 0);
+ context.assertFetchRequestData(fetchRequest, newEpoch, 0L, 0,
context.client.highWatermark());
context.deliverResponse(
fetchRequest.correlationId(),
@@ -2390,7 +2390,7 @@ public class KafkaRaftClientReconfigTest {
context.pollUntilRequest();
fetchRequest = context.assertSentFetchRequest();
- context.assertFetchRequestData(fetchRequest, newEpoch, 0L, 0);
+ context.assertFetchRequestData(fetchRequest, newEpoch, 0L, 0,
context.client.highWatermark());
}
@Test
@@ -2640,7 +2640,7 @@ public class KafkaRaftClientReconfigTest {
context.time.sleep(context.fetchTimeoutMs - 1);
context.pollUntilRequest();
RaftRequest.Outbound fetchRequest =
context.assertSentFetchRequest();
- context.assertFetchRequestData(fetchRequest, epoch, 0L, 0);
+ context.assertFetchRequestData(fetchRequest, epoch, 0L, 0,
context.client.highWatermark());
context.deliverResponse(
fetchRequest.correlationId(),
@@ -2661,7 +2661,7 @@ public class KafkaRaftClientReconfigTest {
context.time.sleep(context.fetchTimeoutMs - 1);
context.pollUntilRequest();
RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
- context.assertFetchRequestData(fetchRequest, epoch, 0L, 0);
+ context.assertFetchRequestData(fetchRequest, epoch, 0L, 0,
context.client.highWatermark());
// after more than 3 fetch timeouts the update voter period timer
should have expired.
// check that the update voter period timer doesn't remain at zero (0)
and cause the message queue to get
@@ -2701,7 +2701,7 @@ public class KafkaRaftClientReconfigTest {
context.time.sleep(context.fetchTimeoutMs - 1);
context.pollUntilRequest();
RaftRequest.Outbound fetchRequest =
context.assertSentFetchRequest();
- context.assertFetchRequestData(fetchRequest, epoch, 0L, 0);
+ context.assertFetchRequestData(fetchRequest, epoch, 0L, 0,
context.client.highWatermark());
context.deliverResponse(
fetchRequest.correlationId(),
@@ -2721,7 +2721,7 @@ public class KafkaRaftClientReconfigTest {
// expect one last FETCH request
context.pollUntilRequest();
RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
- context.assertFetchRequestData(fetchRequest, epoch, 0L, 0);
+ context.assertFetchRequestData(fetchRequest, epoch, 0L, 0,
context.client.highWatermark());
// don't send a response but increase the time
context.time.sleep(context.requestTimeoutMs() - 1);
@@ -2781,7 +2781,7 @@ public class KafkaRaftClientReconfigTest {
context.time.sleep(context.fetchTimeoutMs - 1);
context.pollUntilRequest();
RaftRequest.Outbound fetchRequest =
context.assertSentFetchRequest();
- context.assertFetchRequestData(fetchRequest, epoch, 0L, 0);
+ context.assertFetchRequestData(fetchRequest, epoch, 0L, 0,
context.client.highWatermark());
context.deliverResponse(
fetchRequest.correlationId(),
@@ -2818,7 +2818,7 @@ public class KafkaRaftClientReconfigTest {
// check that there is a fetch to the new leader
context.pollUntilRequest();
RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
- context.assertFetchRequestData(fetchRequest, epoch + 1, 0L, 0);
+ context.assertFetchRequestData(fetchRequest, epoch + 1, 0L, 0,
context.client.highWatermark());
assertEquals(voter2.id(), fetchRequest.destination().id());
}
@@ -2837,7 +2837,7 @@ public class KafkaRaftClientReconfigTest {
context.pollUntilRequest();
RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
- context.assertFetchRequestData(fetchRequest, epoch, 0L, 0);
+ context.assertFetchRequestData(fetchRequest, epoch, 0L, 0,
context.client.highWatermark());
assertEquals(-2, fetchRequest.destination().id());
}
diff --git
a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java
b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java
index bc4be592a20..fd696458b80 100644
--- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java
@@ -155,7 +155,13 @@ public final class KafkaRaftClientSnapshotTest {
long localLogEndOffset = context.log.endOffset().offset();
context.pollUntilRequest();
RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
- context.assertFetchRequestData(fetchRequest, epoch, localLogEndOffset,
snapshotId.epoch());
+ context.assertFetchRequestData(
+ fetchRequest,
+ epoch,
+ localLogEndOffset,
+ snapshotId.epoch(),
+ context.client.highWatermark()
+ );
context.deliverResponse(
fetchRequest.correlationId(),
fetchRequest.destination(),
@@ -163,7 +169,12 @@ public final class KafkaRaftClientSnapshotTest {
);
context.pollUntilRequest();
- context.assertSentFetchRequest(epoch, localLogEndOffset,
snapshotId.epoch());
+ context.assertSentFetchRequest(
+ epoch,
+ localLogEndOffset,
+ snapshotId.epoch(),
+ context.client.highWatermark()
+ );
// Check that listener was notified of the new snapshot
try (SnapshotReader<String> snapshot =
context.listener.drainHandledSnapshot().get()) {
@@ -197,7 +208,13 @@ public final class KafkaRaftClientSnapshotTest {
long localLogEndOffset = context.log.endOffset().offset();
context.pollUntilRequest();
RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
- context.assertFetchRequestData(fetchRequest, epoch, localLogEndOffset,
snapshotId.epoch());
+ context.assertFetchRequestData(
+ fetchRequest,
+ epoch,
+ localLogEndOffset,
+ snapshotId.epoch(),
+ context.client.highWatermark()
+ );
context.deliverResponse(
fetchRequest.correlationId(),
fetchRequest.destination(),
@@ -205,7 +222,12 @@ public final class KafkaRaftClientSnapshotTest {
);
context.pollUntilRequest();
- context.assertSentFetchRequest(epoch, localLogEndOffset,
snapshotId.epoch());
+ context.assertSentFetchRequest(
+ epoch,
+ localLogEndOffset,
+ snapshotId.epoch(),
+ context.client.highWatermark()
+ );
RaftClientTestContext.MockListener secondListener = new
RaftClientTestContext.MockListener(OptionalInt.of(localId));
context.client.register(secondListener);
@@ -1145,7 +1167,7 @@ public final class KafkaRaftClientSnapshotTest {
context.pollUntilRequest();
RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
- context.assertFetchRequestData(fetchRequest, epoch, 0L, 0);
+ context.assertFetchRequestData(fetchRequest, epoch, 0L, 0,
context.client.highWatermark());
context.deliverResponse(
fetchRequest.correlationId(),
@@ -1162,7 +1184,7 @@ public final class KafkaRaftClientSnapshotTest {
context.pollUntilRequest();
fetchRequest = context.assertSentFetchRequest();
- context.assertFetchRequestData(fetchRequest, epoch, 0L, 0);
+ context.assertFetchRequestData(fetchRequest, epoch, 0L, 0,
context.client.highWatermark());
context.deliverResponse(
fetchRequest.correlationId(),
@@ -1179,7 +1201,7 @@ public final class KafkaRaftClientSnapshotTest {
context.pollUntilRequest();
fetchRequest = context.assertSentFetchRequest();
- context.assertFetchRequestData(fetchRequest, epoch, 0L, 0);
+ context.assertFetchRequestData(fetchRequest, epoch, 0L, 0,
context.client.highWatermark());
// Fetch timer is not reset; sleeping for remainder should transition
to prospective
context.time.sleep(context.fetchTimeoutMs - slept);
@@ -1206,7 +1228,7 @@ public final class KafkaRaftClientSnapshotTest {
context.pollUntilRequest();
RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
- context.assertFetchRequestData(fetchRequest, epoch, 0L, 0);
+ context.assertFetchRequestData(fetchRequest, epoch, 0L, 0,
context.client.highWatermark());
context.deliverResponse(
fetchRequest.correlationId(),
@@ -1249,7 +1271,13 @@ public final class KafkaRaftClientSnapshotTest {
context.pollUntilRequest();
fetchRequest = context.assertSentFetchRequest();
- context.assertFetchRequestData(fetchRequest, epoch,
snapshotId.offset(), snapshotId.epoch());
+ context.assertFetchRequestData(
+ fetchRequest,
+ epoch,
+ snapshotId.offset(),
+ snapshotId.epoch(),
+ context.client.highWatermark()
+ );
// Check that the snapshot was written to the log
RawSnapshotReader snapshot =
context.log.readSnapshot(snapshotId).get();
@@ -1279,7 +1307,7 @@ public final class KafkaRaftClientSnapshotTest {
context.pollUntilRequest();
RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
- context.assertFetchRequestData(fetchRequest, epoch, 0L, 0);
+ context.assertFetchRequestData(fetchRequest, epoch, 0L, 0,
context.client.highWatermark());
context.deliverResponse(
fetchRequest.correlationId(),
@@ -1354,7 +1382,13 @@ public final class KafkaRaftClientSnapshotTest {
context.pollUntilRequest();
fetchRequest = context.assertSentFetchRequest();
- context.assertFetchRequestData(fetchRequest, epoch,
snapshotId.offset(), snapshotId.epoch());
+ context.assertFetchRequestData(
+ fetchRequest,
+ epoch,
+ snapshotId.offset(),
+ snapshotId.epoch(),
+ context.client.highWatermark()
+ );
// Check that the snapshot was written to the log
RawSnapshotReader snapshot =
context.log.readSnapshot(snapshotId).get();
@@ -1384,7 +1418,7 @@ public final class KafkaRaftClientSnapshotTest {
context.pollUntilRequest();
RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
- context.assertFetchRequestData(fetchRequest, epoch, 0L, 0);
+ context.assertFetchRequestData(fetchRequest, epoch, 0L, 0,
context.client.highWatermark());
context.deliverResponse(
fetchRequest.correlationId(),
@@ -1426,7 +1460,7 @@ public final class KafkaRaftClientSnapshotTest {
context.pollUntilRequest();
fetchRequest = context.assertSentFetchRequest();
- context.assertFetchRequestData(fetchRequest, epoch, 0L, 0);
+ context.assertFetchRequestData(fetchRequest, epoch, 0L, 0,
context.client.highWatermark());
}
@ParameterizedTest
@@ -1446,7 +1480,7 @@ public final class KafkaRaftClientSnapshotTest {
context.pollUntilRequest();
RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
- context.assertFetchRequestData(fetchRequest, epoch, 0L, 0);
+ context.assertFetchRequestData(fetchRequest, epoch, 0L, 0,
context.client.highWatermark());
context.deliverResponse(
fetchRequest.correlationId(),
@@ -1488,7 +1522,7 @@ public final class KafkaRaftClientSnapshotTest {
context.pollUntilRequest();
fetchRequest = context.assertSentFetchRequest();
- context.assertFetchRequestData(fetchRequest, epoch + 1, 0L, 0);
+ context.assertFetchRequestData(fetchRequest, epoch + 1, 0L, 0,
context.client.highWatermark());
}
@ParameterizedTest
@@ -1507,7 +1541,7 @@ public final class KafkaRaftClientSnapshotTest {
context.pollUntilRequest();
RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
- context.assertFetchRequestData(fetchRequest, epoch, 0L, 0);
+ context.assertFetchRequestData(fetchRequest, epoch, 0L, 0,
context.client.highWatermark());
context.deliverResponse(
fetchRequest.correlationId(),
@@ -1549,7 +1583,7 @@ public final class KafkaRaftClientSnapshotTest {
context.pollUntilRequest();
fetchRequest = context.assertSentFetchRequest();
- context.assertFetchRequestData(fetchRequest, epoch + 1, 0L, 0);
+ context.assertFetchRequestData(fetchRequest, epoch + 1, 0L, 0,
context.client.highWatermark());
}
@ParameterizedTest
@@ -1568,7 +1602,7 @@ public final class KafkaRaftClientSnapshotTest {
context.pollUntilRequest();
RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
- context.assertFetchRequestData(fetchRequest, epoch, 0L, 0);
+ context.assertFetchRequestData(fetchRequest, epoch, 0L, 0,
context.client.highWatermark());
context.deliverResponse(
fetchRequest.correlationId(),
@@ -1639,7 +1673,7 @@ public final class KafkaRaftClientSnapshotTest {
context.pollUntilRequest();
RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
- context.assertFetchRequestData(fetchRequest, epoch, 0L, 0);
+ context.assertFetchRequestData(fetchRequest, epoch, 0L, 0,
context.client.highWatermark());
context.deliverResponse(
fetchRequest.correlationId(),
@@ -1687,7 +1721,7 @@ public final class KafkaRaftClientSnapshotTest {
// Follower should send a fetch request
fetchRequest = context.assertSentFetchRequest();
- context.assertFetchRequestData(fetchRequest, epoch, 0L, 0);
+ context.assertFetchRequestData(fetchRequest, epoch, 0L, 0,
context.client.highWatermark());
context.deliverResponse(
fetchRequest.correlationId(),
@@ -1736,7 +1770,7 @@ public final class KafkaRaftClientSnapshotTest {
// Follower should send a fetch request
fetchRequest = context.assertSentFetchRequest();
- context.assertFetchRequestData(fetchRequest, epoch, 0L, 0);
+ context.assertFetchRequestData(fetchRequest, epoch, 0L, 0,
context.client.highWatermark());
}
@ParameterizedTest
@@ -1755,7 +1789,7 @@ public final class KafkaRaftClientSnapshotTest {
context.pollUntilRequest();
RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
- context.assertFetchRequestData(fetchRequest, epoch, 0L, 0);
+ context.assertFetchRequestData(fetchRequest, epoch, 0L, 0,
context.client.highWatermark());
context.deliverResponse(
fetchRequest.correlationId(),
@@ -2014,7 +2048,7 @@ public final class KafkaRaftClientSnapshotTest {
context.pollUntilRequest();
RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
assertTrue(voters.contains(fetchRequest.destination().id()));
- context.assertFetchRequestData(fetchRequest, epoch, 1L, 1);
+ context.assertFetchRequestData(fetchRequest, epoch, 1L, 1,
context.client.highWatermark());
// The response does not advance the high watermark
List<String> records1 = List.of("b", "c");
@@ -2042,7 +2076,7 @@ public final class KafkaRaftClientSnapshotTest {
context.pollUntilRequest();
fetchRequest = context.assertSentFetchRequest();
assertTrue(voters.contains(fetchRequest.destination().id()));
- context.assertFetchRequestData(fetchRequest, epoch, 3L, 3);
+ context.assertFetchRequestData(fetchRequest, epoch, 3L, 3,
context.client.highWatermark());
List<String> records2 = List.of("d", "e", "f");
int batch2Epoch = 4;
diff --git a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
index 95cc3c3c4fb..eddb99608fb 100644
--- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
@@ -818,7 +818,7 @@ class KafkaRaftClientTest {
RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
assertTrue(voters.contains(fetchRequest.destination().id()));
- context.assertFetchRequestData(fetchRequest, 0, 0L, 0);
+ context.assertFetchRequestData(fetchRequest, 0, 0L, 0,
context.client.highWatermark());
context.deliverResponse(
fetchRequest.correlationId(),
@@ -863,7 +863,7 @@ class KafkaRaftClientTest {
context.assertUnknownLeaderAndNoVotedCandidate(0);
context.pollUntilRequest();
- RaftRequest.Outbound request = context.assertSentFetchRequest(0, 0L,
0);
+ RaftRequest.Outbound request = context.assertSentFetchRequest(0, 0L,
0, OptionalLong.empty());
assertTrue(context.client.quorum().isUnattached());
assertTrue(context.client.quorum().isVoter());
@@ -1430,7 +1430,7 @@ class KafkaRaftClientTest {
context.time.sleep(1);
context.client.poll();
- context.assertSentFetchRequest(leaderEpoch, 0, 0);
+ context.assertSentFetchRequest(leaderEpoch, 0, 0,
OptionalLong.empty());
context.time.sleep(context.electionBackoffMaxMs);
context.client.poll();
@@ -1885,7 +1885,7 @@ class KafkaRaftClientTest {
context.pollUntilRequest();
- context.assertSentFetchRequest(epoch, 0L, 0);
+ context.assertSentFetchRequest(epoch, 0L, 0, OptionalLong.empty());
}
@ParameterizedTest
@@ -1906,7 +1906,7 @@ class KafkaRaftClientTest {
context.assertElectedLeader(epoch, otherNodeId);
context.pollUntilRequest();
- context.assertSentFetchRequest(epoch, 1L, lastEpoch);
+ context.assertSentFetchRequest(epoch, 1L, lastEpoch,
OptionalLong.empty());
}
@ParameterizedTest
@@ -1926,7 +1926,7 @@ class KafkaRaftClientTest {
context.assertElectedLeader(epoch, otherNodeId);
context.pollUntilRequest();
- context.assertSentFetchRequest(epoch, 1L, lastEpoch);
+ context.assertSentFetchRequest(epoch, 1L, lastEpoch,
OptionalLong.empty());
context.time.sleep(context.fetchTimeoutMs);
context.client.poll();
@@ -1952,7 +1952,7 @@ class KafkaRaftClientTest {
context.assertElectedLeader(epoch, otherNodeId);
context.pollUntilRequest();
- context.assertSentFetchRequest(epoch, 1L, lastEpoch);
+ context.assertSentFetchRequest(epoch, 1L, lastEpoch,
OptionalLong.empty());
context.time.sleep(context.fetchTimeoutMs);
context.pollUntilRequest();
@@ -1979,13 +1979,13 @@ class KafkaRaftClientTest {
.build();
context.pollUntilRequest();
- context.assertSentFetchRequest(epoch, 0L, 0);
+ context.assertSentFetchRequest(epoch, 0L, 0, OptionalLong.empty());
assertTrue(context.client.quorum().isUnattached());
context.time.sleep(context.electionTimeoutMs() * 2);
context.pollUntilRequest();
assertTrue(context.client.quorum().isUnattached());
- context.assertSentFetchRequest(epoch, 0L, 0);
+ context.assertSentFetchRequest(epoch, 0L, 0, OptionalLong.empty());
// confirm no vote request was sent
assertEquals(0, context.channel.drainSendQueue().size());
@@ -1998,7 +1998,7 @@ class KafkaRaftClientTest {
context.pollUntilRequest();
// observer cannot transition to prospective though
assertTrue(context.client.quorum().isUnattached());
- context.assertSentFetchRequest(epoch + 1, 0L, 0);
+ context.assertSentFetchRequest(epoch + 1, 0L, 0, OptionalLong.empty());
assertEquals(0, context.channel.drainSendQueue().size());
}
@@ -2017,7 +2017,7 @@ class KafkaRaftClientTest {
.build();
context.pollUntilRequest();
- RaftRequest.Outbound request = context.assertSentFetchRequest(epoch,
0L, 0);
+ RaftRequest.Outbound request = context.assertSentFetchRequest(epoch,
0L, 0, OptionalLong.empty());
assertTrue(context.client.quorum().isUnattached());
assertTrue(context.client.quorum().isVoter());
@@ -2049,7 +2049,7 @@ class KafkaRaftClientTest {
context.pollUntilRequest();
RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
assertTrue(voters.contains(fetchRequest.destination().id()));
- context.assertFetchRequestData(fetchRequest, 0, 0L, 0);
+ context.assertFetchRequestData(fetchRequest, 0, 0L, 0,
context.client.highWatermark());
context.deliverResponse(
fetchRequest.correlationId(),
@@ -2081,7 +2081,7 @@ class KafkaRaftClientTest {
context.pollUntilRequest();
RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
assertTrue(context.bootstrapIds.contains(fetchRequest.destination().id()));
- context.assertFetchRequestData(fetchRequest, 0, 0L, 0);
+ context.assertFetchRequestData(fetchRequest, 0, 0L, 0,
context.client.highWatermark());
context.deliverResponse(
fetchRequest.correlationId(),
@@ -2095,7 +2095,7 @@ class KafkaRaftClientTest {
fetchRequest = context.assertSentFetchRequest();
assertTrue(context.bootstrapIds.contains(fetchRequest.destination().id()));
- context.assertFetchRequestData(fetchRequest, 0, 0L, 0);
+ context.assertFetchRequestData(fetchRequest, 0, 0L, 0,
context.client.highWatermark());
context.deliverResponse(
fetchRequest.correlationId(),
@@ -2128,7 +2128,7 @@ class KafkaRaftClientTest {
context.pollUntilRequest();
RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
assertTrue(context.bootstrapIds.contains(fetchRequest.destination().id()));
- context.assertFetchRequestData(fetchRequest, 0, 0L, 0);
+ context.assertFetchRequestData(fetchRequest, 0, 0L, 0,
context.client.highWatermark());
context.deliverResponse(
fetchRequest.correlationId(),
@@ -2145,7 +2145,7 @@ class KafkaRaftClientTest {
fetchRequest = context.assertSentFetchRequest();
assertNotEquals(leaderId, fetchRequest.destination().id());
assertTrue(context.bootstrapIds.contains(fetchRequest.destination().id()));
- context.assertFetchRequestData(fetchRequest, epoch, 0L, 0);
+ context.assertFetchRequestData(fetchRequest, epoch, 0L, 0,
context.client.highWatermark());
}
@ParameterizedTest
@@ -2173,7 +2173,7 @@ class KafkaRaftClientTest {
RaftRequest.Outbound discoveryFetchRequest =
context.assertSentFetchRequest();
assertFalse(voters.contains(discoveryFetchRequest.destination().id()));
assertTrue(context.bootstrapIds.contains(discoveryFetchRequest.destination().id()));
- context.assertFetchRequestData(discoveryFetchRequest, 0, 0L, 0);
+ context.assertFetchRequestData(discoveryFetchRequest, 0, 0L, 0,
context.client.highWatermark());
// Send a response with the leader and epoch
context.deliverResponse(
@@ -2189,7 +2189,7 @@ class KafkaRaftClientTest {
context.pollUntilRequest();
RaftRequest.Outbound toLeaderFetchRequest =
context.assertSentFetchRequest();
assertEquals(leaderId, toLeaderFetchRequest.destination().id());
- context.assertFetchRequestData(toLeaderFetchRequest, epoch, 0L, 0);
+ context.assertFetchRequestData(toLeaderFetchRequest, epoch, 0L, 0,
context.client.highWatermark());
context.time.sleep(context.requestTimeoutMs());
@@ -2198,7 +2198,7 @@ class KafkaRaftClientTest {
RaftRequest.Outbound retryToBootstrapServerFetchRequest =
context.assertSentFetchRequest();
assertFalse(voters.contains(retryToBootstrapServerFetchRequest.destination().id()));
assertTrue(context.bootstrapIds.contains(retryToBootstrapServerFetchRequest.destination().id()));
- context.assertFetchRequestData(retryToBootstrapServerFetchRequest,
epoch, 0L, 0);
+ context.assertFetchRequestData(retryToBootstrapServerFetchRequest,
epoch, 0L, 0, context.client.highWatermark());
// Deliver the delayed responses from the leader
Records records = context.buildBatch(0L, 3, List.of("a", "b"));
@@ -2247,7 +2247,7 @@ class KafkaRaftClientTest {
RaftRequest.Outbound discoveryFetchRequest =
context.assertSentFetchRequest();
assertFalse(voters.contains(discoveryFetchRequest.destination().id()));
assertTrue(context.bootstrapIds.contains(discoveryFetchRequest.destination().id()));
- context.assertFetchRequestData(discoveryFetchRequest, 0, 0L, 0);
+ context.assertFetchRequestData(discoveryFetchRequest, 0, 0L, 0,
context.client.highWatermark());
// Send a response with the leader and epoch
context.deliverResponse(
@@ -2263,7 +2263,7 @@ class KafkaRaftClientTest {
context.pollUntilRequest();
RaftRequest.Outbound toLeaderFetchRequest =
context.assertSentFetchRequest();
assertEquals(leaderId, toLeaderFetchRequest.destination().id());
- context.assertFetchRequestData(toLeaderFetchRequest, epoch, 0L, 0);
+ context.assertFetchRequestData(toLeaderFetchRequest, epoch, 0L, 0,
context.client.highWatermark());
context.time.sleep(context.requestTimeoutMs());
@@ -2272,7 +2272,7 @@ class KafkaRaftClientTest {
RaftRequest.Outbound retryToBootstrapServerFetchRequest =
context.assertSentFetchRequest();
assertFalse(voters.contains(retryToBootstrapServerFetchRequest.destination().id()));
assertTrue(context.bootstrapIds.contains(retryToBootstrapServerFetchRequest.destination().id()));
- context.assertFetchRequestData(retryToBootstrapServerFetchRequest,
epoch, 0L, 0);
+ context.assertFetchRequestData(retryToBootstrapServerFetchRequest,
epoch, 0L, 0, context.client.highWatermark());
// At this point toLeaderFetchRequest has timed out but
retryToBootstrapServerFetchRequest
// is still waiting for a response.
@@ -2378,17 +2378,23 @@ class KafkaRaftClientTest {
context.assertSentFetchPartitionResponse(Errors.INVALID_REQUEST,
epoch, OptionalInt.of(localId));
// null cluster id is accepted
- context.deliverRequest(context.fetchRequest(epoch, null, otherNodeKey,
-5L, 0, 0));
+ context.deliverRequest(
+ context.fetchRequest(epoch, null, otherNodeKey, -5L, 0,
OptionalLong.of(Long.MAX_VALUE), 0)
+ );
context.pollUntilResponse();
context.assertSentFetchPartitionResponse(Errors.INVALID_REQUEST,
epoch, OptionalInt.of(localId));
// empty cluster id is rejected
- context.deliverRequest(context.fetchRequest(epoch, "", otherNodeKey,
-5L, 0, 0));
+ context.deliverRequest(
+ context.fetchRequest(epoch, "", otherNodeKey, -5L, 0,
OptionalLong.of(Long.MAX_VALUE), 0)
+ );
context.pollUntilResponse();
context.assertSentFetchPartitionResponse(Errors.INCONSISTENT_CLUSTER_ID);
// invalid cluster id is rejected
- context.deliverRequest(context.fetchRequest(epoch, "invalid-uuid",
otherNodeKey, -5L, 0, 0));
+ context.deliverRequest(
+ context.fetchRequest(epoch, "invalid-uuid", otherNodeKey, -5L, 0,
OptionalLong.of(Long.MAX_VALUE), 0)
+ );
context.pollUntilResponse();
context.assertSentFetchPartitionResponse(Errors.INCONSISTENT_CLUSTER_ID);
}
@@ -2778,7 +2784,7 @@ class KafkaRaftClientTest {
// Wait until we have a Fetch inflight to the leader
context.pollUntilRequest();
- RaftRequest.Outbound fetchRequest =
context.assertSentFetchRequest(epoch, 0L, 0);
+ RaftRequest.Outbound fetchRequest =
context.assertSentFetchRequest(epoch, 0L, 0, OptionalLong.empty());
// Now await the fetch timeout and become prospective
context.time.sleep(context.fetchTimeoutMs);
@@ -2818,7 +2824,7 @@ class KafkaRaftClientTest {
// Wait until we have a Fetch inflight to the leader
context.pollUntilRequest();
- RaftRequest.Outbound fetchRequest =
context.assertSentFetchRequest(epoch, 0L, 0);
+ RaftRequest.Outbound fetchRequest =
context.assertSentFetchRequest(epoch, 0L, 0, OptionalLong.empty());
// Now receive a BeginEpoch from `voter3`
context.deliverRequest(context.beginEpochRequest(epoch + 1, voter3));
@@ -2915,7 +2921,7 @@ class KafkaRaftClientTest {
context.pollUntilRequest();
RaftRequest.Outbound fetchRequest1 = context.assertSentFetchRequest();
assertEquals(leaderId, fetchRequest1.destination().id());
- context.assertFetchRequestData(fetchRequest1, epoch, 0L, 0);
+ context.assertFetchRequestData(fetchRequest1, epoch, 0L, 0,
context.client.highWatermark());
context.deliverResponse(
fetchRequest1.correlationId(),
@@ -2929,7 +2935,7 @@ class KafkaRaftClientTest {
RaftRequest.Outbound fetchRequest2 = context.assertSentFetchRequest();
assertNotEquals(leaderId, fetchRequest2.destination().id());
assertTrue(context.bootstrapIds.contains(fetchRequest2.destination().id()));
- context.assertFetchRequestData(fetchRequest2, epoch, 0L, 0);
+ context.assertFetchRequestData(fetchRequest2, epoch, 0L, 0,
context.client.highWatermark());
}
@ParameterizedTest
@@ -2954,7 +2960,7 @@ class KafkaRaftClientTest {
context.pollUntilRequest();
RaftRequest.Outbound fetchRequest1 = context.assertSentFetchRequest();
assertEquals(leaderId, fetchRequest1.destination().id());
- context.assertFetchRequestData(fetchRequest1, epoch, 0L, 0);
+ context.assertFetchRequestData(fetchRequest1, epoch, 0L, 0,
context.client.highWatermark());
context.time.sleep(context.requestTimeoutMs());
context.pollUntilRequest();
@@ -2964,7 +2970,7 @@ class KafkaRaftClientTest {
RaftRequest.Outbound fetchRequest2 = context.assertSentFetchRequest();
assertNotEquals(leaderId, fetchRequest2.destination().id());
assertTrue(context.bootstrapIds.contains(fetchRequest2.destination().id()));
- context.assertFetchRequestData(fetchRequest2, epoch, 0L, 0);
+ context.assertFetchRequestData(fetchRequest2, epoch, 0L, 0,
context.client.highWatermark());
}
@ParameterizedTest
@@ -2985,12 +2991,12 @@ class KafkaRaftClientTest {
.withKip853Rpc(withKip853Rpc)
.build();
- context.discoverLeaderAsObserver(leaderId, epoch);
+ context.discoverLeaderAsObserver(leaderId, epoch,
context.client.highWatermark());
context.pollUntilRequest();
RaftRequest.Outbound fetchRequest1 = context.assertSentFetchRequest();
assertEquals(leaderId, fetchRequest1.destination().id());
- context.assertFetchRequestData(fetchRequest1, epoch, 0L, 0);
+ context.assertFetchRequestData(fetchRequest1, epoch, 0L, 0,
context.client.highWatermark());
context.deliverResponse(
fetchRequest1.correlationId(),
@@ -3004,7 +3010,7 @@ class KafkaRaftClientTest {
RaftRequest.Outbound fetchRequest2 = context.assertSentFetchRequest();
assertNotEquals(leaderId, fetchRequest2.destination().id());
assertTrue(context.bootstrapIds.contains(fetchRequest2.destination().id()));
- context.assertFetchRequestData(fetchRequest2, epoch, 0L, 0);
+ context.assertFetchRequestData(fetchRequest2, epoch, 0L, 0,
context.client.highWatermark());
context.deliverResponse(
fetchRequest2.correlationId(),
@@ -3034,12 +3040,12 @@ class KafkaRaftClientTest {
.withKip853Rpc(withKip853Rpc)
.build();
- context.discoverLeaderAsObserver(leaderId, epoch);
+ context.discoverLeaderAsObserver(leaderId, epoch,
context.client.highWatermark());
context.pollUntilRequest();
RaftRequest.Outbound fetchRequest1 = context.assertSentFetchRequest();
assertEquals(leaderId, fetchRequest1.destination().id());
- context.assertFetchRequestData(fetchRequest1, epoch, 0L, 0);
+ context.assertFetchRequestData(fetchRequest1, epoch, 0L, 0,
context.client.highWatermark());
context.time.sleep(context.requestTimeoutMs());
context.pollUntilRequest();
@@ -3049,7 +3055,7 @@ class KafkaRaftClientTest {
RaftRequest.Outbound fetchRequest2 = context.assertSentFetchRequest();
assertNotEquals(leaderId, fetchRequest2.destination().id());
assertTrue(context.bootstrapIds.contains(fetchRequest2.destination().id()));
- context.assertFetchRequestData(fetchRequest2, epoch, 0L, 0);
+ context.assertFetchRequestData(fetchRequest2, epoch, 0L, 0,
context.client.highWatermark());
context.deliverResponse(
fetchRequest2.correlationId(),
@@ -3727,7 +3733,7 @@ class KafkaRaftClientTest {
context.pollUntilRequest();
- RaftRequest.Outbound fetchQuorumRequest =
context.assertSentFetchRequest(epoch, 0L, 0);
+ RaftRequest.Outbound fetchQuorumRequest =
context.assertSentFetchRequest(epoch, 0L, 0, OptionalLong.empty());
Records records = context.buildBatch(0L, 3, List.of("a", "b"));
FetchResponseData response = context.fetchResponse(epoch, otherNodeId,
records, 0L, Errors.NONE);
context.deliverResponse(
@@ -3758,7 +3764,7 @@ class KafkaRaftClientTest {
context.pollUntilRequest();
- RaftRequest.Outbound fetchQuorumRequest =
context.assertSentFetchRequest(epoch, 0L, 0);
+ RaftRequest.Outbound fetchQuorumRequest =
context.assertSentFetchRequest(epoch, 0L, 0, OptionalLong.empty());
Records records = context.buildBatch(0L, 3, List.of("a", "b"));
FetchResponseData response = context.fetchResponse(epoch, otherNodeId,
records, 0L, Errors.NONE);
context.deliverResponse(
@@ -3789,7 +3795,7 @@ class KafkaRaftClientTest {
// Receive an empty fetch response
context.pollUntilRequest();
- RaftRequest.Outbound fetchQuorumRequest =
context.assertSentFetchRequest(epoch, 0L, 0);
+ RaftRequest.Outbound fetchQuorumRequest =
context.assertSentFetchRequest(epoch, 0L, 0, OptionalLong.empty());
FetchResponseData fetchResponse = context.fetchResponse(
epoch,
otherNodeId,
@@ -3809,7 +3815,7 @@ class KafkaRaftClientTest {
// Receive some records in the next poll, but do not advance high
watermark
context.pollUntilRequest();
Records records = context.buildBatch(0L, epoch, List.of("a", "b"));
- fetchQuorumRequest = context.assertSentFetchRequest(epoch, 0L, 0);
+ fetchQuorumRequest = context.assertSentFetchRequest(epoch, 0L, 0,
OptionalLong.of(0));
fetchResponse = context.fetchResponse(epoch, otherNodeId, records, 0L,
Errors.NONE);
context.deliverResponse(
fetchQuorumRequest.correlationId(),
@@ -3822,7 +3828,7 @@ class KafkaRaftClientTest {
// The next fetch response is empty, but should still advance the high
watermark
context.pollUntilRequest();
- fetchQuorumRequest = context.assertSentFetchRequest(epoch, 2L, epoch);
+ fetchQuorumRequest = context.assertSentFetchRequest(epoch, 2L, epoch,
OptionalLong.of(0));
fetchResponse = context.fetchResponse(
epoch,
otherNodeId,
@@ -3974,10 +3980,20 @@ class KafkaRaftClientTest {
context.pollUntilRequest();
- RaftRequest.Outbound request = context.assertSentFetchRequest(epoch,
3L, lastEpoch);
+ RaftRequest.Outbound request = context.assertSentFetchRequest(
+ epoch,
+ 3L,
+ lastEpoch,
+ OptionalLong.empty()
+ );
- FetchResponseData response = context.divergingFetchResponse(epoch,
otherNodeId, 2L,
- lastEpoch, 1L);
+ FetchResponseData response = context.divergingFetchResponse(
+ epoch,
+ otherNodeId,
+ 2L,
+ lastEpoch,
+ 1L
+ );
context.deliverResponse(request.correlationId(),
request.destination(), response);
// Poll again to complete truncation
@@ -3987,7 +4003,7 @@ class KafkaRaftClientTest {
// Now we should be fetching
context.client.poll();
- context.assertSentFetchRequest(epoch, 2L, lastEpoch);
+ context.assertSentFetchRequest(epoch, 2L, lastEpoch,
context.client.highWatermark());
}
@ParameterizedTest
@@ -4255,7 +4271,7 @@ class KafkaRaftClientTest {
context.pollUntilRequest();
RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
assertTrue(voters.contains(fetchRequest.destination().id()));
- context.assertFetchRequestData(fetchRequest, epoch, 0L, 0);
+ context.assertFetchRequestData(fetchRequest, epoch, 0L, 0,
context.client.highWatermark());
// The response does not advance the high watermark
List<String> records1 = List.of("a", "b", "c");
@@ -4276,7 +4292,7 @@ class KafkaRaftClientTest {
context.pollUntilRequest();
fetchRequest = context.assertSentFetchRequest();
assertTrue(voters.contains(fetchRequest.destination().id()));
- context.assertFetchRequestData(fetchRequest, epoch, 3L, 3);
+ context.assertFetchRequestData(fetchRequest, epoch, 3L, 3,
context.client.highWatermark());
// The high watermark advances to include the first batch we fetched
List<String> records2 = List.of("d", "e", "f");
@@ -4514,7 +4530,7 @@ class KafkaRaftClientTest {
context.pollUntilRequest();
RaftRequest.Outbound fetchRequest1 = context.assertSentFetchRequest();
assertTrue(context.bootstrapIds.contains(fetchRequest1.destination().id()));
- context.assertFetchRequestData(fetchRequest1, 0, 0L, 0);
+ context.assertFetchRequestData(fetchRequest1, 0, 0L, 0,
context.client.highWatermark());
int leaderEpoch = 5;
@@ -4531,7 +4547,7 @@ class KafkaRaftClientTest {
context.pollUntilRequest();
RaftRequest.Outbound fetchRequest2 = context.assertSentFetchRequest();
assertEquals(leaderId, fetchRequest2.destination().id());
- context.assertFetchRequestData(fetchRequest2, leaderEpoch, 0L, 0);
+ context.assertFetchRequestData(fetchRequest2, leaderEpoch, 0L, 0,
context.client.highWatermark());
List<String> records = List.of("a", "b", "c");
MemoryRecords batch1 = context.buildBatch(0L, 3, records);
diff --git
a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
index 9f0a5084326..b84c603ac9f 100644
--- a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
+++ b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
@@ -957,7 +957,7 @@ public final class RaftClientTestContext {
return requests;
}
- private List<RaftResponse.Outbound> drainSentResponses(
+ List<RaftResponse.Outbound> drainSentResponses(
ApiKeys apiKey
) {
List<RaftResponse.Outbound> res = new ArrayList<>();
@@ -1114,23 +1114,20 @@ public final class RaftClientTestContext {
RaftRequest.Outbound assertSentFetchRequest(
int epoch,
long fetchOffset,
- int lastFetchedEpoch
+ int lastFetchedEpoch,
+ OptionalLong highWatermark
) {
List<RaftRequest.Outbound> sentMessages = channel.drainSendQueue();
assertEquals(1, sentMessages.size());
RaftRequest.Outbound raftRequest = sentMessages.get(0);
- assertFetchRequestData(raftRequest, epoch, fetchOffset,
lastFetchedEpoch);
+ assertFetchRequestData(raftRequest, epoch, fetchOffset,
lastFetchedEpoch, highWatermark);
return raftRequest;
}
- FetchResponseData.PartitionData assertSentFetchPartitionResponse() {
- List<RaftResponse.Outbound> sentMessages =
drainSentResponses(ApiKeys.FETCH);
- assertEquals(
- 1, sentMessages.size(), "Found unexpected sent messages " +
sentMessages);
- RaftResponse.Outbound raftMessage = sentMessages.get(0);
- assertEquals(ApiKeys.FETCH.id, raftMessage.data().apiKey());
- FetchResponseData response = (FetchResponseData) raftMessage.data();
+ FetchResponseData.PartitionData
assertFetchResponseData(RaftResponse.Outbound message) {
+ assertEquals(ApiKeys.FETCH.id, message.data().apiKey());
+ FetchResponseData response = (FetchResponseData) message.data();
assertEquals(Errors.NONE, Errors.forCode(response.errorCode()));
assertEquals(1, response.responses().size());
@@ -1152,17 +1149,30 @@ public final class RaftClientTestContext {
return partitionResponse;
}
+ FetchResponseData.PartitionData assertSentFetchPartitionResponse() {
+ List<RaftResponse.Outbound> sentMessages =
drainSentResponses(ApiKeys.FETCH);
+ assertEquals(
+ 1,
+ sentMessages.size(),
+ "Found unexpected sent messages " + sentMessages
+ );
+
+ return assertFetchResponseData(sentMessages.get(0));
+ }
+
void assertSentFetchPartitionResponse(Errors topLevelError) {
List<RaftResponse.Outbound> sentMessages =
drainSentResponses(ApiKeys.FETCH);
assertEquals(
- 1, sentMessages.size(), "Found unexpected sent messages " +
sentMessages);
+ 1,
+ sentMessages.size(),
+ "Found unexpected sent messages " + sentMessages
+ );
RaftResponse.Outbound raftMessage = sentMessages.get(0);
assertEquals(ApiKeys.FETCH.id, raftMessage.data().apiKey());
FetchResponseData response = (FetchResponseData) raftMessage.data();
assertEquals(topLevelError, Errors.forCode(response.errorCode()));
}
-
MemoryRecords assertSentFetchPartitionResponse(
Errors error,
int epoch,
@@ -1375,7 +1385,8 @@ public final class RaftClientTestContext {
void discoverLeaderAsObserver(
int leaderId,
- int epoch
+ int epoch,
+ OptionalLong highWatermark
) throws Exception {
pollUntilRequest();
RaftRequest.Outbound fetchRequest = assertSentFetchRequest();
@@ -1384,7 +1395,7 @@ public final class RaftClientTestContext {
startingVoters.voterIds().contains(destinationId) ||
bootstrapIds.contains(destinationId),
String.format("id %d is not in sets %s or %s", destinationId,
startingVoters, bootstrapIds)
);
- assertFetchRequestData(fetchRequest, 0, 0L, 0);
+ assertFetchRequestData(fetchRequest, 0, 0L, 0, highWatermark);
deliverResponse(
fetchRequest.correlationId(),
@@ -1672,7 +1683,8 @@ public final class RaftClientTestContext {
RaftRequest.Outbound message,
int epoch,
long fetchOffset,
- int lastFetchedEpoch
+ int lastFetchedEpoch,
+ OptionalLong highWatermark
) {
assertInstanceOf(
FetchRequestData.class,
@@ -1691,6 +1703,7 @@ public final class RaftClientTestContext {
assertEquals(fetchOffset, fetchPartition.fetchOffset());
assertEquals(lastFetchedEpoch, fetchPartition.lastFetchedEpoch());
assertEquals(localId.orElse(-1), request.replicaState().replicaId());
+ assertEquals(highWatermark.orElse(-1), fetchPartition.highWatermark());
// Assert that voters have flushed up to the fetch offset
if ((localId.isPresent() &&
startingVoters.voterIds().contains(localId.getAsInt())) ||
@@ -1716,6 +1729,24 @@ public final class RaftClientTestContext {
long fetchOffset,
int lastFetchedEpoch,
int maxWaitTimeMs
+ ) {
+ return fetchRequest(
+ epoch,
+ replicaKey,
+ fetchOffset,
+ lastFetchedEpoch,
+ OptionalLong.of(Long.MAX_VALUE),
+ maxWaitTimeMs
+ );
+ }
+
+ FetchRequestData fetchRequest(
+ int epoch,
+ ReplicaKey replicaKey,
+ long fetchOffset,
+ int lastFetchedEpoch,
+ OptionalLong highWatermark,
+ int maxWaitTimeMs
) {
return fetchRequest(
epoch,
@@ -1723,6 +1754,7 @@ public final class RaftClientTestContext {
replicaKey,
fetchOffset,
lastFetchedEpoch,
+ highWatermark,
maxWaitTimeMs
);
}
@@ -1733,6 +1765,7 @@ public final class RaftClientTestContext {
ReplicaKey replicaKey,
long fetchOffset,
int lastFetchedEpoch,
+ OptionalLong highWatermark,
int maxWaitTimeMs
) {
FetchRequestData request = RaftUtil.singletonFetchRequest(
@@ -1742,7 +1775,8 @@ public final class RaftClientTestContext {
fetchPartition
.setCurrentLeaderEpoch(epoch)
.setLastFetchedEpoch(lastFetchedEpoch)
- .setFetchOffset(fetchOffset);
+ .setFetchOffset(fetchOffset)
+ .setHighWatermark(highWatermark.orElse(-1));
if (raftProtocol.isReconfigSupported()) {
fetchPartition
.setReplicaDirectoryId(replicaKey.directoryId().orElse(ReplicaKey.NO_DIRECTORY_ID));
@@ -1932,7 +1966,9 @@ public final class RaftClientTestContext {
}
private short fetchRpcVersion() {
- if (raftProtocol.isReconfigSupported()) {
+ if (raftProtocol.isHwmInFetchSupported()) {
+ return 18;
+ } else if (raftProtocol.isReconfigSupported()) {
return 17;
} else {
return 16;
@@ -2236,7 +2272,9 @@ public final class RaftClientTestContext {
// dynamic quorum reconfiguration support
KIP_853_PROTOCOL,
// preVote support
- KIP_996_PROTOCOL;
+ KIP_996_PROTOCOL,
+ // HWM in FETCH request support
+ KIP_1166_PROTOCOL;
boolean isKRaftSupported() {
return isAtLeast(KIP_595_PROTOCOL);
@@ -2250,6 +2288,10 @@ public final class RaftClientTestContext {
return isAtLeast(KIP_996_PROTOCOL);
}
+ boolean isHwmInFetchSupported() {
+ return isAtLeast(KIP_1166_PROTOCOL);
+ }
+
private boolean isAtLeast(RaftProtocol otherRpc) {
return this.compareTo(otherRpc) >= 0;
}
diff --git
a/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
b/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
index 9e9ed6f8447..f05db6d187a 100644
---
a/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
+++
b/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
@@ -116,6 +116,9 @@ public enum MetadataVersion {
// Streams groups are early access in 4.1 (KIP-1071).
IBP_4_1_IV0(26, "4.1", "IV0", false),
+ // Send FETCH version 18 in the replica fetcher (KIP-1166)
+ IBP_4_1_IV1(27, "4.1", "IV1", false),
+
// Insert any additional IBP_4_1_IVx versions above this comment, and bump
the feature level of
// IBP_4_2_IVx accordingly. When 4.2 development begins, IBP_4_2_IV0 will
cease to be
// a placeholder.
@@ -126,7 +129,7 @@ public enum MetadataVersion {
// *** SHARE GROUPS BECOME PRODUCTION-READY IN THE FUTURE. ITS DEFINITION
ALLOWS A SHARE ***
// *** GROUPS FEATURE TO BE DEFINED IN 4.1 BUT TURNED OFF BY DEFAULT, ABLE
TO BE TURNED ON ***
// *** DYNAMICALLY TO TRY OUT THE PREVIEW CAPABILITY.
***
- IBP_4_2_IV0(27, "4.2", "IV0", false),
+ IBP_4_2_IV0(28, "4.2", "IV0", false),
// Enables "streams" groups by default for new clusters (KIP-1071).
//
@@ -134,7 +137,7 @@ public enum MetadataVersion {
// *** STREAMS GROUPS BECOME PRODUCTION-READY IN THE FUTURE. ITS
DEFINITION ALLOWS A STREAMS ***
// *** GROUPS FEATURE TO BE DEFINED IN 4.1 BUT TURNED OFF BY DEFAULT, ABLE
TO BE TURNED ON ***
// *** DYNAMICALLY TO TRY OUT THE EARLY ACCESS CAPABILITY.
***
- IBP_4_2_IV1(28, "4.2", "IV1", false);
+ IBP_4_2_IV1(29, "4.2", "IV1", false);
// NOTES when adding a new version:
// Update the default version in @ClusterTest annotation to point to the
latest version
@@ -264,13 +267,15 @@ public enum MetadataVersion {
}
public short fetchRequestVersion() {
- if (this.isAtLeast(IBP_3_9_IV0)) {
+ if (isAtLeast(IBP_4_1_IV1)) {
+ return 18;
+ } else if (isAtLeast(IBP_3_9_IV0)) {
return 17;
- } else if (this.isAtLeast(IBP_3_7_IV4)) {
+ } else if (isAtLeast(IBP_3_7_IV4)) {
return 16;
- } else if (this.isAtLeast(IBP_3_5_IV1)) {
+ } else if (isAtLeast(IBP_3_5_IV1)) {
return 15;
- } else if (this.isAtLeast(IBP_3_5_IV0)) {
+ } else if (isAtLeast(IBP_3_5_IV0)) {
return 14;
} else {
return 13;
diff --git a/tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java
b/tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java
index c416cfb67d5..23794fde622 100644
--- a/tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java
+++ b/tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java
@@ -121,8 +121,16 @@ public class FeatureCommandTest {
"disable", "--feature", "metadata.version"))
);
// Change expected message to reflect possible MetadataVersion range
1-N (N increases when adding a new version)
- assertEquals("Could not disable metadata.version. The update failed
for all features since the following " +
- "feature had an error: Invalid update version 0 for feature
metadata.version. Local controller 3000 only supports versions 7-28",
commandOutput);
+ assertEquals(
+ String.format(
+ "Could not disable metadata.version. The update failed for all
features since the " +
+ "following feature had an error: Invalid update version 0 for
feature " +
+ "metadata.version. Local controller 3000 only supports
versions %s-%s",
+ MetadataVersion.MINIMUM_VERSION.featureLevel(),
+ MetadataVersion.latestTesting().featureLevel()
+ ),
+ commandOutput
+ );
commandOutput = ToolsTestUtils.captureStandardOut(() ->
assertEquals(1,
FeatureCommand.mainNoExit("--bootstrap-server", cluster.bootstrapServers(),