[GitHub] [kafka] dajac commented on a diff in pull request #13323: KAFKA-14617: Add ReplicaState to FetchRequest

2023-03-15 Thread via GitHub


dajac commented on code in PR #13323:
URL: https://github.com/apache/kafka/pull/13323#discussion_r1136820680


##
clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java:
##
@@ -130,10 +131,35 @@ private static Optional optionalEpoch(int 
rawEpochValue) {
 }
 }
 
+// It is only used by KafkaRaftClient for downgrading the FetchRequest. 
Notice that, it will throw
+// UnsupportedOperationException if it is used for upgrading.
+public static class SimpleBuilder extends 
AbstractRequest.Builder {
+private final FetchRequestData fetchRequestData;
+public SimpleBuilder(FetchRequestData fetchRequestData) {

Review Comment:
   nit: Let's add an empty line before this one.



##
clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java:
##
@@ -130,10 +131,35 @@ private static Optional optionalEpoch(int 
rawEpochValue) {
 }
 }
 
+// It is only used by KafkaRaftClient for downgrading the FetchRequest. 
Notice that, it will throw
+// UnsupportedOperationException if it is used for upgrading.

Review Comment:
   nit: I would remove the second sentence because I find it unclear. What does 
`upgrading` mean here?



##
clients/src/test/java/org/apache/kafka/common/requests/FetchRequestTest.java:
##
@@ -198,6 +204,35 @@ public void testForgottenTopics(short version) {
 }
 }
 
+@ParameterizedTest
+@ApiKeyVersionsSource(apiKey = ApiKeys.FETCH)
+public void testFetchRequestSimpleBuilderDowngrade(short version) {

Review Comment:
   nit: `...FetchStateDowngrade`?



##
core/src/test/scala/unit/kafka/raft/KafkaNetworkChannelTest.scala:
##
@@ -159,6 +162,28 @@ class KafkaNetworkChannelTest {
 }
   }
 
+  @ParameterizedTest
+  @ApiKeyVersionsSource(apiKey = ApiKeys.FETCH)
+  def testFetchRequestDowngrade(version: Short): Unit = {
+val destinationId = 2
+val destinationNode = new Node(destinationId, "127.0.0.1", 9092)
+channel.updateEndpoint(destinationId, new InetAddressSpec(
+  new InetSocketAddress(destinationNode.host, destinationNode.port)))
+sendTestRequest(ApiKeys.FETCH, destinationId)
+channel.pollOnce()
+
+assertEquals(1, client.requests().size())
+val request = client.requests().peek().requestBuilder().build(version)
+
+if (version < 15) {
+  assertTrue(request.asInstanceOf[FetchRequest].data().replicaId() == 1)

Review Comment:
   small nit: In Scala, we usually don't put the `()` for accessors. In this 
test, you can remove them for `data`, `replicaId`, `size`, `replicaState`, etc.



##
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##
@@ -983,16 +988,16 @@ private CompletableFuture 
handleFetchRequest(
 Errors error = Errors.forException(cause);
 if (error != Errors.REQUEST_TIMED_OUT) {
 logger.debug("Failed to handle fetch from {} at {} due to 
{}",
-request.replicaId(), fetchPartition.fetchOffset(), 
error);
+replicaId, fetchPartition.fetchOffset(), error);

Review Comment:
   nit: Let's keep the original indentation please.



##
raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java:
##
@@ -1436,6 +1441,47 @@ public void testInvalidFetchRequest() throws Exception {
 context.assertSentFetchPartitionResponse(Errors.INVALID_REQUEST, 
epoch, OptionalInt.of(localId));
 }
 
+// This test mainly focuses on whether the leader state is correctly 
updated under different fetch version.
+@ParameterizedTest
+@ApiKeyVersionsSource(apiKey = ApiKeys.FETCH)
+public void testLeaderStateUpdateWithDifferentFetchRequestVersions(short 
version) throws Exception {
+int localId = 0;
+int otherNodeId = 1;
+int epoch = 5;
+Set voters = Utils.mkSet(localId, otherNodeId);
+
+RaftClientTestContext context = 
RaftClientTestContext.initializeAsLeader(localId, voters, epoch);
+
+// First, test with a correct fetch request.
+FetchRequestData fetchRequestData = context.fetchRequest(epoch, 
otherNodeId, 1L, epoch, 0);
+FetchRequestData downgradedRequest = new 
FetchRequest.SimpleBuilder(fetchRequestData).build(version).data();
+context.deliverRequest(downgradedRequest);
+context.client.poll();
+context.assertSentFetchPartitionResponse(Errors.NONE, epoch, 
OptionalInt.of(localId));
+assertEquals(1L, context.log.highWatermark().offset);

Review Comment:
   nit: Other tests use `context.client.highWatermark()`. Should we also use 
this one?



##
raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java:
##
@@ -1436,6 +1441,47 @@ public void testInvalidFetchRequest() throws Exception {
 context.assertSentFetchPartitionResponse(Errors.INVALID_REQUEST, 
epoch, OptionalInt.of(localId));
 }
 
+// This test 

[GitHub] [kafka] dajac commented on a diff in pull request #13323: KAFKA-14617: Add ReplicaState to FetchRequest

2023-03-14 Thread via GitHub


dajac commented on code in PR #13323:
URL: https://github.com/apache/kafka/pull/13323#discussion_r1135754940


##
clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java:
##
@@ -130,10 +131,33 @@ private static Optional optionalEpoch(int 
rawEpochValue) {
 }
 }
 
+public static class SimpleBuilder extends 
AbstractRequest.Builder {
+private final FetchRequestData fetchRequestData;
+public SimpleBuilder(FetchRequestData fetchRequestData) {
+super(ApiKeys.FETCH);
+this.fetchRequestData = fetchRequestData;
+}
+
+@Override
+public FetchRequest build(short version) {
+int replicaId = FetchRequest.replicaId(fetchRequestData);
+long replicaEpoch = fetchRequestData.replicaState().replicaEpoch();
+if (version < 15) {
+fetchRequestData.setReplicaId(replicaId);
+fetchRequestData.setReplicaState(new ReplicaState());
+} else {
+fetchRequestData.setReplicaState(new 
ReplicaState().setReplicaId(replicaId).setReplicaEpoch(replicaEpoch));
+fetchRequestData.setReplicaId(-1);
+}

Review Comment:
   My understanding is that we always use the new format everywhere so we 
should only care about downgrading, no? If we get a replica id >= 0, we could 
even consider throwing an UnsupportedVersionException for instance.



##
core/src/main/scala/kafka/raft/KafkaNetworkChannel.scala:
##
@@ -44,7 +45,10 @@ object KafkaNetworkChannel {
   case fetchRequest: FetchRequestData =>
 // Since we already have the request, we go through a simplified 
builder
 new AbstractRequest.Builder[FetchRequest](ApiKeys.FETCH) {
-  override def build(version: Short): FetchRequest = new 
FetchRequest(fetchRequest, version)
+  override def build(version: Short): FetchRequest = {
+val builder = new SimpleBuilder(fetchRequest)
+new FetchRequest(builder.build(version).data(), version)
+  }
   override def toString: String = fetchRequest.toString
 }

Review Comment:
   You can replace all of this by `new 
FetchRequest.SimpleBuilder(fetchRequest)`.



##
clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java:
##
@@ -130,10 +131,33 @@ private static Optional optionalEpoch(int 
rawEpochValue) {
 }
 }
 
+public static class SimpleBuilder extends 
AbstractRequest.Builder {

Review Comment:
   nit: Could we put a comment saying that this is only used by the 
KafkaRaftClient?



##
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##
@@ -983,16 +987,16 @@ private CompletableFuture 
handleFetchRequest(
 Errors error = Errors.forException(cause);
 if (error != Errors.REQUEST_TIMED_OUT) {
 logger.debug("Failed to handle fetch from {} at {} due to 
{}",
-request.replicaId(), fetchPartition.fetchOffset(), 
error);
+FetchRequest.replicaId(request), 
fetchPartition.fetchOffset(), error);
 return buildEmptyFetchResponse(error, Optional.empty());
 }
 }
 
 // FIXME: `completionTimeMs`, which can be null
 logger.trace("Completing delayed fetch from {} starting at offset 
{} at {}",
-request.replicaId(), fetchPartition.fetchOffset(), 
completionTimeMs);
+FetchRequest.replicaId(request), fetchPartition.fetchOffset(), 
completionTimeMs);
 
-return tryCompleteFetchRequest(request.replicaId(), 
fetchPartition, time.milliseconds());
+return tryCompleteFetchRequest(FetchRequest.replicaId(request), 
fetchPartition, time.milliseconds());

Review Comment:
   nit: Would it make sense to pull `FetchRequest.replicaId(request)` into a 
variable instead of calling it everywhere?



##
core/src/test/scala/kafka/server/RemoteLeaderEndPointTest.scala:
##
@@ -116,4 +119,18 @@ class RemoteLeaderEndPointTest {
 assertThrows(classOf[UnknownLeaderEpochException], () => 
endPoint.fetchEarliestOffset(topicPartition, currentLeaderEpoch + 1))
 assertThrows(classOf[UnknownLeaderEpochException], () => 
endPoint.fetchLatestOffset(topicPartition, currentLeaderEpoch + 1))
 }
+
+@Test
+def testBrokerEpochSupplier(): Unit = {
+val tp = new TopicPartition("topic1", 0)
+val topicId1 = Uuid.randomUuid()
+val log: UnifiedLog = mock(classOf[UnifiedLog])
+val partitionMap = Map(
+tp -> PartitionFetchState(Some(topicId1), 150, None, 0, None, 
state = Fetching, lastFetchedEpoch = None))
+when(replicaManager.localLogOrException(tp)).thenReturn(log)
+when(log.logStartOffset).thenReturn(1)
+val ResultWithPartitions(fetchRequestOpt, partitionsWithError) = 

[GitHub] [kafka] dajac commented on a diff in pull request #13323: KAFKA-14617 Add ReplicaState to FetchRequest.

2023-03-10 Thread via GitHub


dajac commented on code in PR #13323:
URL: https://github.com/apache/kafka/pull/13323#discussion_r1132627550


##
clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java:
##
@@ -302,6 +320,19 @@ public String toString() {
 }
 }
 
+// Downgrades the ReplicaState field to be compatible with lower version.
+public static void maybeDownGradeReplicaState(FetchRequestData 
fetchRequestData, short version) {
+if (version < 15) {
+
fetchRequestData.setReplicaId(fetchRequestData.replicaState().replicaId());
+fetchRequestData.setReplicaState(new ReplicaState());
+}
+}
+
+public static int replicaId(FetchRequestData fetchRequestData) {
+return fetchRequestData.replicaId() != -1 ?
+fetchRequestData.replicaId() : 
fetchRequestData.replicaState().replicaId();

Review Comment:
   nit: Should we keep it on a single line?



##
clients/src/test/java/org/apache/kafka/common/requests/FetchRequestTest.java:
##
@@ -198,6 +202,24 @@ public void testForgottenTopics(short version) {
 }
 }
 
+@ParameterizedTest
+@MethodSource("fetchVersions")

Review Comment:
   nit: You can use @ApiKeysVersionSource here.



##
core/src/main/scala/kafka/raft/KafkaNetworkChannel.scala:
##
@@ -105,14 +108,18 @@ class KafkaNetworkChannel(
   private val correlationIdCounter = new AtomicInteger(0)
   private val endpoints = mutable.HashMap.empty[Int, Node]
 
-  private val requestThread = new RaftSendThread(
+  private var requestThread = new RaftSendThread(
 name = threadNamePrefix + "-outbound-request-thread",
 networkClient = client,
 requestTimeoutMs = requestTimeoutMs,
 time = time,
 isInterruptible = false
   )
 
+  def setRequestThread(raftSendThread: RaftSendThread): Unit = {
+requestThread = raftSendThread
+  }

Review Comment:
   Hum.. I would rather prefer to avoid doing this. In the test, it seems that 
you could rely on the mocked client instead of doing this. Could you please 
give it a try?



##
core/src/test/scala/kafka/server/RemoteLeaderEndPointTest.scala:
##
@@ -58,7 +58,7 @@ class RemoteLeaderEndPointTest {
 blockingSend = new MockBlockingSender(offsets = new 
util.HashMap[TopicPartition, EpochEndOffset](),
 sourceBroker = sourceBroker, time = time)
 endPoint = new RemoteLeaderEndPoint(logPrefix, blockingSend, 
fetchSessionHandler,
-config, replicaManager, QuotaFactory.UnboundedQuota, () => 
MetadataVersion.MINIMUM_KRAFT_VERSION)
+config, replicaManager, QuotaFactory.UnboundedQuota, () => 
MetadataVersion.MINIMUM_KRAFT_VERSION, () => 1)

Review Comment:
   nit: Should we add a small unit test in this file to ensure that the value 
is propagated correctly?



##
core/src/test/scala/unit/kafka/raft/KafkaNetworkChannelTest.scala:
##
@@ -159,6 +167,29 @@ class KafkaNetworkChannelTest {
 }
   }
 
+  @ParameterizedTest
+  @MethodSource(Array("fetchVersions"))
+  def testFetchRequestDowngrade(version: Short): Unit = {
+val destinationId = 2
+val destinationNode = new Node(destinationId, "127.0.0.1", 9092)
+channel.updateEndpoint(destinationId, new InetAddressSpec(
+  new InetSocketAddress(destinationNode.host, destinationNode.port)))
+val mockSendThread = mock(classOf[RaftSendThread])
+channel.setRequestThread(mockSendThread)

Review Comment:
   As said earlier, I would rather prefer to avoid this.



##
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##
@@ -3366,8 +3366,10 @@ class KafkaApisTest {
 when(clientQuotaManager.maybeRecordAndGetThrottleTimeMs(
   any[RequestChannel.Request](), anyDouble, anyLong)).thenReturn(0)
 
-val fetchRequest = new FetchRequest.Builder(9, 9, -1, 100, 0, 
fetchDataBuilder)
+val fetchRequest = new FetchRequest.Builder(9, 9, -1, -1, 100, 0, 
fetchDataBuilder)
   .build()
+assertEquals(fetchRequest.replicaEpoch(), -1)
+assertEquals(fetchRequest.replicaId(), -1)

Review Comment:
   Those assertions are not really useful here because they don't test what the 
KafkaApis layer does. Let's remove them.



##
core/src/test/scala/unit/kafka/raft/KafkaNetworkChannelTest.scala:
##
@@ -159,6 +167,29 @@ class KafkaNetworkChannelTest {
 }
   }
 
+  @ParameterizedTest
+  @MethodSource(Array("fetchVersions"))

Review Comment:
   nit: You can use `@ApiKeyVersionsSource` as well.



##
core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala:
##
@@ -208,6 +210,7 @@ class ReplicaFetcherThreadTest {
 
 //Assert that truncate to is called exactly once (despite two loops)
 verify(partition, times(3)).truncateTo(anyLong(), anyBoolean())
+assertTrue(mockBrokerEpochSupplier.getCounter() > 0)

Review Comment:
   It is really weird to add those assertions in random tests. They don't bring 
much value so I 

[GitHub] [kafka] dajac commented on a diff in pull request #13323: KAFKA-14617 Add ReplicaState to FetchRequest.

2023-03-08 Thread via GitHub


dajac commented on code in PR #13323:
URL: https://github.com/apache/kafka/pull/13323#discussion_r1129840701


##
core/src/main/scala/kafka/raft/KafkaNetworkChannel.scala:
##
@@ -44,7 +44,10 @@ object KafkaNetworkChannel {
   case fetchRequest: FetchRequestData =>
 // Since we already have the request, we go through a simplified 
builder
 new AbstractRequest.Builder[FetchRequest](ApiKeys.FETCH) {
-  override def build(version: Short): FetchRequest = new 
FetchRequest(fetchRequest, version)
+  override def build(version: Short): FetchRequest = {

Review Comment:
   Should we add tests for this one as well?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13323: KAFKA-14617 Add ReplicaState to FetchRequest.

2023-03-08 Thread via GitHub


dajac commented on code in PR #13323:
URL: https://github.com/apache/kafka/pull/13323#discussion_r1129828078


##
clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java:
##
@@ -302,6 +320,31 @@ public String toString() {
 }
 }
 
+public static void updateReplicaStateBasedOnVersion(FetchRequestData 
fetchRequestData, short version) {
+if (fetchRequestData.replicaId() == 
fetchRequestData.replicaState().replicaId()) {
+// The only case where these two replica ids are the same is that 
they are both -1. Nothing to update.
+return;
+}
+if (fetchRequestData.replicaId() != -1) {
+// Using old replicaId.
+if (version >= 15) {
+fetchRequestData.setReplicaState(new 
ReplicaState().setReplicaId(fetchRequestData.replicaId()));
+fetchRequestData.setReplicaId(-1);
+}
+return;
+}
+// Using replica state
+if (version < 15) {
+
fetchRequestData.setReplicaId(fetchRequestData.replicaState().replicaId());
+fetchRequestData.setReplicaState(new ReplicaState());
+}
+}
+
+public static int getReplicaIdWithoutVersion(FetchRequestData 
fetchRequestData) {

Review Comment:
   nit: We usually don't prefix getters with `get`. We could just call this one 
`replicaId`.



##
clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java:
##
@@ -228,12 +240,18 @@ public FetchRequest build(short version) {
 }
 
 FetchRequestData fetchRequestData = new FetchRequestData();
-fetchRequestData.setReplicaId(replicaId);
 fetchRequestData.setMaxWaitMs(maxWait);
 fetchRequestData.setMinBytes(minBytes);
 fetchRequestData.setMaxBytes(maxBytes);
 fetchRequestData.setIsolationLevel(isolationLevel.id());
 fetchRequestData.setForgottenTopicsData(new ArrayList<>());
+if (version < 15) {
+fetchRequestData.setReplicaId(replicaId);
+} else {
+fetchRequestData.setReplicaState(new ReplicaState()
+.setReplicaId(replicaId)
+.setReplicaEpoch(replicaEpoch));

Review Comment:
   nit: Could we use 4 spaces to indent this two lines to be consistent with 
similar code in this method?



##
raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java:
##
@@ -974,7 +974,7 @@ void assertFetchRequestData(
 assertEquals(epoch, fetchPartition.currentLeaderEpoch());
 assertEquals(fetchOffset, fetchPartition.fetchOffset());
 assertEquals(lastFetchedEpoch, fetchPartition.lastFetchedEpoch());
-assertEquals(localId.orElse(-1), request.replicaId());
+assertEquals(localId.orElse(-1), request.replicaState().replicaId());

Review Comment:
   Could we extend or add tests to cover the changed in KafkaRaftClient? It 
would be great if we could test the old and the version of the fetch request to 
ensure that everything works as expected.



##
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##
@@ -3290,7 +3290,7 @@ class KafkaApisTest {
 when(clientQuotaManager.maybeRecordAndGetThrottleTimeMs(
   any[RequestChannel.Request](), anyDouble, anyLong)).thenReturn(0)
 
-val fetchRequest = new FetchRequest.Builder(9, 9, -1, 100, 0, 
fetchDataBuilder)
+val fetchRequest = new FetchRequest.Builder(9, 9, -1, -1, 100, 0, 
fetchDataBuilder)

Review Comment:
   Should we extend tests here as well?



##
core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala:
##
@@ -103,11 +103,12 @@ class ReplicaFetcherThreadTest {
  failedPartitions: FailedPartitions,
  replicaMgr: ReplicaManager,
  quota: ReplicaQuota,
- leaderEndpointBlockingSend: 
BlockingSend): ReplicaFetcherThread = {
+ leaderEndpointBlockingSend: 
BlockingSend,

Review Comment:
   Should we also add tests here to test the old and the new version of the 
fetch request?



##
clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java:
##
@@ -302,6 +320,31 @@ public String toString() {
 }
 }
 
+public static void updateReplicaStateBasedOnVersion(FetchRequestData 
fetchRequestData, short version) {

Review Comment:
   I wonder if we could simplify this method a bit. My understanding is that we 
only use it in the raft client where we always set the `ReplicaState` field. 
This means that we should only care about downgrading here if the version does 
not support the `ReplicaState` field. I would also name the method 
`maybeDowngradeReplicaState`.



-- 
This is an automated message from the Apache Git 

[GitHub] [kafka] dajac commented on a diff in pull request #13323: KAFKA-14617; Add ReplicaState to FetchRequest.

2023-03-02 Thread via GitHub


dajac commented on code in PR #13323:
URL: https://github.com/apache/kafka/pull/13323#discussion_r1123094538


##
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##
@@ -936,13 +937,13 @@ private CompletableFuture 
handleFetchRequest(
 RaftRequest.Inbound requestMetadata,
 long currentTimeMs
 ) {
-FetchRequestData request = (FetchRequestData) requestMetadata.data;
+FetchRequest request = new FetchRequest((FetchRequestData) 
requestMetadata.data, requestMetadata.apiVersion);

Review Comment:
   I am not sure to understand why we need this change. Could you elaborate?



##
raft/src/main/java/org/apache/kafka/raft/RaftRequest.java:
##
@@ -45,11 +45,14 @@ public long createdTimeMs() {
 return createdTimeMs;
 }
 
+
 public static class Inbound extends RaftRequest {
 public final CompletableFuture completion = new 
CompletableFuture<>();
+public final short apiVersion;
 
-public Inbound(int correlationId, ApiMessage data, long createdTimeMs) 
{
+public Inbound(int correlationId, ApiMessage data, long createdTimeMs, 
short apiVertion) {

Review Comment:
   Same question here.



##
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##
@@ -48,6 +48,7 @@
 import org.apache.kafka.common.requests.DescribeQuorumResponse;
 import org.apache.kafka.common.requests.EndQuorumEpochRequest;
 import org.apache.kafka.common.requests.EndQuorumEpochResponse;
+import org.apache.kafka.common.requests.FetchRequest;

Review Comment:
   Not related to this line. It would be better to always construct the 
FetchRequest with the new schema 
[here](https://github.com/apache/kafka/blob/a304d91d49b2ca56b51d3a9d6589ef69a6065bbb/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java#L1791)
 and to downgrade in the builder later on.



##
clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java:
##
@@ -302,6 +315,18 @@ public String toString() {
 }
 }
 
+public static void updateFetchRequestDataReplicaState(FetchRequestData 
fetchRequestData, int replicaId, long replicaEpoch, short version) {
+if (version < 15) {
+fetchRequestData.setReplicaId(replicaId);
+fetchRequestData.setReplicaState(new ReplicaState());
+} else {
+fetchRequestData.setReplicaId(new FetchRequestData().replicaId());

Review Comment:
   nit: Allocating `FetchRequestData` to get `-1` is a bit wasteful here. Could 
we just use `-1`?



##
storage/src/main/java/org/apache/kafka/storage/internals/log/FetchParams.java:
##
@@ -25,6 +25,7 @@
 public class FetchParams {
 public final short requestVersion;
 public final int replicaId;
+public final long brokerEpoch;

Review Comment:
   nit: replicaEpoch?



##
clients/src/main/resources/common/message/FetchRequest.json:
##
@@ -50,14 +50,22 @@
   // Version 13 replaces topic names with topic IDs (KIP-516). May return 
UNKNOWN_TOPIC_ID error code.
   //
   // Version 14 is the same as version 13 but it also receives a new error 
called OffsetMovedToTieredStorageException(KIP-405)
-  "validVersions": "0-14",
+  //
+  // Version 15 adds the ReplicaState which includes new field ReplicaEpoch 
and the ReplicaId (KIP-903)
+  "validVersions": "0-15",
   "flexibleVersions": "12+",
   "fields": [
 { "name": "ClusterId", "type": "string", "versions": "12+", 
"nullableVersions": "12+", "default": "null",
   "taggedVersions": "12+", "tag": 0, "ignorable": true,
   "about": "The clusterId if known. This is used to validate metadata 
fetches prior to broker registration." },
-{ "name": "ReplicaId", "type": "int32", "versions": "0+", "entityType": 
"brokerId",
+{ "name": "ReplicaId", "type": "int32", "versions": "0-14", "entityType": 
"brokerId",
   "about": "The broker ID of the follower, of -1 if this request is from a 
consumer." },
+{ "name": "ReplicaState", "type": "ReplicaState", "taggedVersions":"15+", 
"tag": 1, "fields": [
+  { "name": "ReplicaId", "type": "int32", "versions": "15+", "default": 
"-1", "entityType": "brokerId",
+"about": "The replica ID of the follower, of -1 if this request is 
from a consumer." },

Review Comment:
   nit: `of` -> `or`?



##
storage/src/main/java/org/apache/kafka/storage/internals/log/FetchParams.java:
##
@@ -42,6 +44,7 @@ public FetchParams(short requestVersion,
 Objects.requireNonNull(clientMetadata);
 this.requestVersion = requestVersion;
 this.replicaId = replicaId;
+this.brokerEpoch = brokerEpoch;

Review Comment:
   We need to update `hashCode`, `equals` and `toString` as well.



##
core/src/test/scala/unit/kafka/cluster/PartitionTest.scala:
##
@@ -110,11 +110,13 @@ object PartitionTest {
 replicaId: Int,
 maxWaitMs: Long = 0L,
 minBytes: Int = 1,
-maxBytes: Int =