dajac commented on code in PR #13323:
URL: https://github.com/apache/kafka/pull/13323#discussion_r1129828078


##########
clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java:
##########
@@ -302,6 +320,31 @@ public String toString() {
         }
     }
 
+    public static void updateReplicaStateBasedOnVersion(FetchRequestData 
fetchRequestData, short version) {
+        if (fetchRequestData.replicaId() == 
fetchRequestData.replicaState().replicaId()) {
+            // The only case where these two replica ids are the same is that 
they are both -1. Nothing to update.
+            return;
+        }
+        if (fetchRequestData.replicaId() != -1) {
+            // Using old replicaId.
+            if (version >= 15) {
+                fetchRequestData.setReplicaState(new 
ReplicaState().setReplicaId(fetchRequestData.replicaId()));
+                fetchRequestData.setReplicaId(-1);
+            }
+            return;
+        }
+        // Using replica state
+        if (version < 15) {
+            
fetchRequestData.setReplicaId(fetchRequestData.replicaState().replicaId());
+            fetchRequestData.setReplicaState(new ReplicaState());
+        }
+    }
+
+    public static int getReplicaIdWithoutVersion(FetchRequestData 
fetchRequestData) {

Review Comment:
   nit: We usually don't prefix getters with `get`. We could just call this one 
`replicaId`.



##########
clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java:
##########
@@ -228,12 +240,18 @@ public FetchRequest build(short version) {
             }
 
             FetchRequestData fetchRequestData = new FetchRequestData();
-            fetchRequestData.setReplicaId(replicaId);
             fetchRequestData.setMaxWaitMs(maxWait);
             fetchRequestData.setMinBytes(minBytes);
             fetchRequestData.setMaxBytes(maxBytes);
             fetchRequestData.setIsolationLevel(isolationLevel.id());
             fetchRequestData.setForgottenTopicsData(new ArrayList<>());
+            if (version < 15) {
+                fetchRequestData.setReplicaId(replicaId);
+            } else {
+                fetchRequestData.setReplicaState(new ReplicaState()
+                        .setReplicaId(replicaId)
+                        .setReplicaEpoch(replicaEpoch));

Review Comment:
   nit: Could we use 4 spaces to indent this two lines to be consistent with 
similar code in this method?



##########
raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java:
##########
@@ -974,7 +974,7 @@ void assertFetchRequestData(
         assertEquals(epoch, fetchPartition.currentLeaderEpoch());
         assertEquals(fetchOffset, fetchPartition.fetchOffset());
         assertEquals(lastFetchedEpoch, fetchPartition.lastFetchedEpoch());
-        assertEquals(localId.orElse(-1), request.replicaId());
+        assertEquals(localId.orElse(-1), request.replicaState().replicaId());

Review Comment:
   Could we extend or add tests to cover the changed in KafkaRaftClient? It 
would be great if we could test the old and the version of the fetch request to 
ensure that everything works as expected.



##########
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##########
@@ -3290,7 +3290,7 @@ class KafkaApisTest {
     when(clientQuotaManager.maybeRecordAndGetThrottleTimeMs(
       any[RequestChannel.Request](), anyDouble, anyLong)).thenReturn(0)
 
-    val fetchRequest = new FetchRequest.Builder(9, 9, -1, 100, 0, 
fetchDataBuilder)
+    val fetchRequest = new FetchRequest.Builder(9, 9, -1, -1, 100, 0, 
fetchDataBuilder)

Review Comment:
   Should we extend tests here as well?



##########
core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala:
##########
@@ -103,11 +103,12 @@ class ReplicaFetcherThreadTest {
                                          failedPartitions: FailedPartitions,
                                          replicaMgr: ReplicaManager,
                                          quota: ReplicaQuota,
-                                         leaderEndpointBlockingSend: 
BlockingSend): ReplicaFetcherThread = {
+                                         leaderEndpointBlockingSend: 
BlockingSend,

Review Comment:
   Should we also add tests here to test the old and the new version of the 
fetch request?



##########
clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java:
##########
@@ -302,6 +320,31 @@ public String toString() {
         }
     }
 
+    public static void updateReplicaStateBasedOnVersion(FetchRequestData 
fetchRequestData, short version) {

Review Comment:
   I wonder if we could simplify this method a bit. My understanding is that we 
only use it in the raft client where we always set the `ReplicaState` field. 
This means that we should only care about downgrading here if the version does 
not support the `ReplicaState` field. I would also name the method 
`maybeDowngradeReplicaState`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to