dajac commented on code in PR #13323:
URL: https://github.com/apache/kafka/pull/13323#discussion_r1135754940


##########
clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java:
##########
@@ -130,10 +131,33 @@ private static Optional<Integer> optionalEpoch(int 
rawEpochValue) {
         }
     }
 
+    public static class SimpleBuilder extends 
AbstractRequest.Builder<FetchRequest> {
+        private final FetchRequestData fetchRequestData;
+        public SimpleBuilder(FetchRequestData fetchRequestData) {
+            super(ApiKeys.FETCH);
+            this.fetchRequestData = fetchRequestData;
+        }
+
+        @Override
+        public FetchRequest build(short version) {
+            int replicaId = FetchRequest.replicaId(fetchRequestData);
+            long replicaEpoch = fetchRequestData.replicaState().replicaEpoch();
+            if (version < 15) {
+                fetchRequestData.setReplicaId(replicaId);
+                fetchRequestData.setReplicaState(new ReplicaState());
+            } else {
+                fetchRequestData.setReplicaState(new 
ReplicaState().setReplicaId(replicaId).setReplicaEpoch(replicaEpoch));
+                fetchRequestData.setReplicaId(-1);
+            }

Review Comment:
   My understanding is that we always use the new format everywhere so we 
should only care about downgrading, no? If we get a replica id >= 0, we could 
even consider throwing an UnsupportedVersionException for instance.



##########
core/src/main/scala/kafka/raft/KafkaNetworkChannel.scala:
##########
@@ -44,7 +45,10 @@ object KafkaNetworkChannel {
       case fetchRequest: FetchRequestData =>
         // Since we already have the request, we go through a simplified 
builder
         new AbstractRequest.Builder[FetchRequest](ApiKeys.FETCH) {
-          override def build(version: Short): FetchRequest = new 
FetchRequest(fetchRequest, version)
+          override def build(version: Short): FetchRequest = {
+            val builder = new SimpleBuilder(fetchRequest)
+            new FetchRequest(builder.build(version).data(), version)
+          }
           override def toString: String = fetchRequest.toString
         }

Review Comment:
   You can replace all of this by `new 
FetchRequest.SimpleBuilder(fetchRequest)`.



##########
clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java:
##########
@@ -130,10 +131,33 @@ private static Optional<Integer> optionalEpoch(int 
rawEpochValue) {
         }
     }
 
+    public static class SimpleBuilder extends 
AbstractRequest.Builder<FetchRequest> {

Review Comment:
   nit: Could we put a comment saying that this is only used by the 
KafkaRaftClient?



##########
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##########
@@ -983,16 +987,16 @@ private CompletableFuture<FetchResponseData> 
handleFetchRequest(
                 Errors error = Errors.forException(cause);
                 if (error != Errors.REQUEST_TIMED_OUT) {
                     logger.debug("Failed to handle fetch from {} at {} due to 
{}",
-                        request.replicaId(), fetchPartition.fetchOffset(), 
error);
+                        FetchRequest.replicaId(request), 
fetchPartition.fetchOffset(), error);
                     return buildEmptyFetchResponse(error, Optional.empty());
                 }
             }
 
             // FIXME: `completionTimeMs`, which can be null
             logger.trace("Completing delayed fetch from {} starting at offset 
{} at {}",
-                request.replicaId(), fetchPartition.fetchOffset(), 
completionTimeMs);
+                FetchRequest.replicaId(request), fetchPartition.fetchOffset(), 
completionTimeMs);
 
-            return tryCompleteFetchRequest(request.replicaId(), 
fetchPartition, time.milliseconds());
+            return tryCompleteFetchRequest(FetchRequest.replicaId(request), 
fetchPartition, time.milliseconds());

Review Comment:
   nit: Would it make sense to pull `FetchRequest.replicaId(request)` into a 
variable instead of calling it everywhere?



##########
core/src/test/scala/kafka/server/RemoteLeaderEndPointTest.scala:
##########
@@ -116,4 +119,18 @@ class RemoteLeaderEndPointTest {
         assertThrows(classOf[UnknownLeaderEpochException], () => 
endPoint.fetchEarliestOffset(topicPartition, currentLeaderEpoch + 1))
         assertThrows(classOf[UnknownLeaderEpochException], () => 
endPoint.fetchLatestOffset(topicPartition, currentLeaderEpoch + 1))
     }
+
+    @Test
+    def testBrokerEpochSupplier(): Unit = {
+        val tp = new TopicPartition("topic1", 0)
+        val topicId1 = Uuid.randomUuid()
+        val log: UnifiedLog = mock(classOf[UnifiedLog])
+        val partitionMap = Map(
+            tp -> PartitionFetchState(Some(topicId1), 150, None, 0, None, 
state = Fetching, lastFetchedEpoch = None))
+        when(replicaManager.localLogOrException(tp)).thenReturn(log)
+        when(log.logStartOffset).thenReturn(1)
+        val ResultWithPartitions(fetchRequestOpt, partitionsWithError) = 
endPoint.buildFetch(partitionMap)
+        assertTrue(partitionsWithError.isEmpty)
+        assertEquals(1L, 
fetchRequestOpt.get.fetchRequest.build(15).replicaEpoch())

Review Comment:
   nit: 1) Should we test all versions? 2) Could we bump the replica epoch 
after this point, build a new fetch request, and assert it again?



##########
raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java:
##########
@@ -1436,6 +1441,38 @@ public void testInvalidFetchRequest() throws Exception {
         context.assertSentFetchPartitionResponse(Errors.INVALID_REQUEST, 
epoch, OptionalInt.of(localId));
     }
 
+    @ParameterizedTest
+    @ApiKeyVersionsSource(apiKey = ApiKeys.FETCH)
+    public void testFetchRequestVersionHandling(short version) throws 
Exception {

Review Comment:
   I need to take a deeper look into this one.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to