dajac commented on code in PR #13323: URL: https://github.com/apache/kafka/pull/13323#discussion_r1129828078
########## clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java: ########## @@ -302,6 +320,31 @@ public String toString() { } } + public static void updateReplicaStateBasedOnVersion(FetchRequestData fetchRequestData, short version) { + if (fetchRequestData.replicaId() == fetchRequestData.replicaState().replicaId()) { + // The only case where these two replica ids are the same is that they are both -1. Nothing to update. + return; + } + if (fetchRequestData.replicaId() != -1) { + // Using old replicaId. + if (version >= 15) { + fetchRequestData.setReplicaState(new ReplicaState().setReplicaId(fetchRequestData.replicaId())); + fetchRequestData.setReplicaId(-1); + } + return; + } + // Using replica state + if (version < 15) { + fetchRequestData.setReplicaId(fetchRequestData.replicaState().replicaId()); + fetchRequestData.setReplicaState(new ReplicaState()); + } + } + + public static int getReplicaIdWithoutVersion(FetchRequestData fetchRequestData) { Review Comment: nit: We usually don't prefix getters with `get`. We could just call this one `replicaId`. ########## clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java: ########## @@ -228,12 +240,18 @@ public FetchRequest build(short version) { } FetchRequestData fetchRequestData = new FetchRequestData(); - fetchRequestData.setReplicaId(replicaId); fetchRequestData.setMaxWaitMs(maxWait); fetchRequestData.setMinBytes(minBytes); fetchRequestData.setMaxBytes(maxBytes); fetchRequestData.setIsolationLevel(isolationLevel.id()); fetchRequestData.setForgottenTopicsData(new ArrayList<>()); + if (version < 15) { + fetchRequestData.setReplicaId(replicaId); + } else { + fetchRequestData.setReplicaState(new ReplicaState() + .setReplicaId(replicaId) + .setReplicaEpoch(replicaEpoch)); Review Comment: nit: Could we use 4 spaces to indent this two lines to be consistent with similar code in this method? ########## raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java: ########## @@ -974,7 +974,7 @@ void assertFetchRequestData( assertEquals(epoch, fetchPartition.currentLeaderEpoch()); assertEquals(fetchOffset, fetchPartition.fetchOffset()); assertEquals(lastFetchedEpoch, fetchPartition.lastFetchedEpoch()); - assertEquals(localId.orElse(-1), request.replicaId()); + assertEquals(localId.orElse(-1), request.replicaState().replicaId()); Review Comment: Could we extend or add tests to cover the changed in KafkaRaftClient? It would be great if we could test the old and the version of the fetch request to ensure that everything works as expected. ########## core/src/test/scala/unit/kafka/server/KafkaApisTest.scala: ########## @@ -3290,7 +3290,7 @@ class KafkaApisTest { when(clientQuotaManager.maybeRecordAndGetThrottleTimeMs( any[RequestChannel.Request](), anyDouble, anyLong)).thenReturn(0) - val fetchRequest = new FetchRequest.Builder(9, 9, -1, 100, 0, fetchDataBuilder) + val fetchRequest = new FetchRequest.Builder(9, 9, -1, -1, 100, 0, fetchDataBuilder) Review Comment: Should we extend tests here as well? ########## core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala: ########## @@ -103,11 +103,12 @@ class ReplicaFetcherThreadTest { failedPartitions: FailedPartitions, replicaMgr: ReplicaManager, quota: ReplicaQuota, - leaderEndpointBlockingSend: BlockingSend): ReplicaFetcherThread = { + leaderEndpointBlockingSend: BlockingSend, Review Comment: Should we also add tests here to test the old and the new version of the fetch request? ########## clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java: ########## @@ -302,6 +320,31 @@ public String toString() { } } + public static void updateReplicaStateBasedOnVersion(FetchRequestData fetchRequestData, short version) { Review Comment: I wonder if we could simplify this method a bit. My understanding is that we only use it in the raft client where we always set the `ReplicaState` field. This means that we should only care about downgrading here if the version does not support the `ReplicaState` field. I would also name the method `maybeDowngradeReplicaState`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org