dajac commented on code in PR #13323:
URL: https://github.com/apache/kafka/pull/13323#discussion_r1136820680


##########
clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java:
##########
@@ -130,10 +131,35 @@ private static Optional<Integer> optionalEpoch(int 
rawEpochValue) {
         }
     }
 
+    // It is only used by KafkaRaftClient for downgrading the FetchRequest. 
Notice that, it will throw
+    // UnsupportedOperationException if it is used for upgrading.
+    public static class SimpleBuilder extends 
AbstractRequest.Builder<FetchRequest> {
+        private final FetchRequestData fetchRequestData;
+        public SimpleBuilder(FetchRequestData fetchRequestData) {

Review Comment:
   nit: Let's add an empty line before this one.



##########
clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java:
##########
@@ -130,10 +131,35 @@ private static Optional<Integer> optionalEpoch(int 
rawEpochValue) {
         }
     }
 
+    // It is only used by KafkaRaftClient for downgrading the FetchRequest. 
Notice that, it will throw
+    // UnsupportedOperationException if it is used for upgrading.

Review Comment:
   nit: I would remove the second sentence because I find it unclear. What does 
`upgrading` mean here?



##########
clients/src/test/java/org/apache/kafka/common/requests/FetchRequestTest.java:
##########
@@ -198,6 +204,35 @@ public void testForgottenTopics(short version) {
         }
     }
 
+    @ParameterizedTest
+    @ApiKeyVersionsSource(apiKey = ApiKeys.FETCH)
+    public void testFetchRequestSimpleBuilderDowngrade(short version) {

Review Comment:
   nit: `...FetchStateDowngrade`?



##########
core/src/test/scala/unit/kafka/raft/KafkaNetworkChannelTest.scala:
##########
@@ -159,6 +162,28 @@ class KafkaNetworkChannelTest {
     }
   }
 
+  @ParameterizedTest
+  @ApiKeyVersionsSource(apiKey = ApiKeys.FETCH)
+  def testFetchRequestDowngrade(version: Short): Unit = {
+    val destinationId = 2
+    val destinationNode = new Node(destinationId, "127.0.0.1", 9092)
+    channel.updateEndpoint(destinationId, new InetAddressSpec(
+      new InetSocketAddress(destinationNode.host, destinationNode.port)))
+    sendTestRequest(ApiKeys.FETCH, destinationId)
+    channel.pollOnce()
+
+    assertEquals(1, client.requests().size())
+    val request = client.requests().peek().requestBuilder().build(version)
+
+    if (version < 15) {
+      assertTrue(request.asInstanceOf[FetchRequest].data().replicaId() == 1)

Review Comment:
   small nit: In Scala, we usually don't put the `()` for accessors. In this 
test, you can remove them for `data`, `replicaId`, `size`, `replicaState`, etc.



##########
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##########
@@ -983,16 +988,16 @@ private CompletableFuture<FetchResponseData> 
handleFetchRequest(
                 Errors error = Errors.forException(cause);
                 if (error != Errors.REQUEST_TIMED_OUT) {
                     logger.debug("Failed to handle fetch from {} at {} due to 
{}",
-                        request.replicaId(), fetchPartition.fetchOffset(), 
error);
+                            replicaId, fetchPartition.fetchOffset(), error);

Review Comment:
   nit: Let's keep the original indentation please.



##########
raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java:
##########
@@ -1436,6 +1441,47 @@ public void testInvalidFetchRequest() throws Exception {
         context.assertSentFetchPartitionResponse(Errors.INVALID_REQUEST, 
epoch, OptionalInt.of(localId));
     }
 
+    // This test mainly focuses on whether the leader state is correctly 
updated under different fetch version.
+    @ParameterizedTest
+    @ApiKeyVersionsSource(apiKey = ApiKeys.FETCH)
+    public void testLeaderStateUpdateWithDifferentFetchRequestVersions(short 
version) throws Exception {
+        int localId = 0;
+        int otherNodeId = 1;
+        int epoch = 5;
+        Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
+
+        RaftClientTestContext context = 
RaftClientTestContext.initializeAsLeader(localId, voters, epoch);
+
+        // First, test with a correct fetch request.
+        FetchRequestData fetchRequestData = context.fetchRequest(epoch, 
otherNodeId, 1L, epoch, 0);
+        FetchRequestData downgradedRequest = new 
FetchRequest.SimpleBuilder(fetchRequestData).build(version).data();
+        context.deliverRequest(downgradedRequest);
+        context.client.poll();
+        context.assertSentFetchPartitionResponse(Errors.NONE, epoch, 
OptionalInt.of(localId));
+        assertEquals(1L, context.log.highWatermark().offset);

Review Comment:
   nit: Other tests use `context.client.highWatermark()`. Should we also use 
this one?



##########
raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java:
##########
@@ -1436,6 +1441,47 @@ public void testInvalidFetchRequest() throws Exception {
         context.assertSentFetchPartitionResponse(Errors.INVALID_REQUEST, 
epoch, OptionalInt.of(localId));
     }
 
+    // This test mainly focuses on whether the leader state is correctly 
updated under different fetch version.
+    @ParameterizedTest
+    @ApiKeyVersionsSource(apiKey = ApiKeys.FETCH)
+    public void testLeaderStateUpdateWithDifferentFetchRequestVersions(short 
version) throws Exception {
+        int localId = 0;
+        int otherNodeId = 1;
+        int epoch = 5;
+        Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
+
+        RaftClientTestContext context = 
RaftClientTestContext.initializeAsLeader(localId, voters, epoch);
+
+        // First, test with a correct fetch request.
+        FetchRequestData fetchRequestData = context.fetchRequest(epoch, 
otherNodeId, 1L, epoch, 0);
+        FetchRequestData downgradedRequest = new 
FetchRequest.SimpleBuilder(fetchRequestData).build(version).data();
+        context.deliverRequest(downgradedRequest);
+        context.client.poll();

Review Comment:
   nit: context.pollUntilResponse() seems better here.



##########
raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java:
##########
@@ -1436,6 +1441,47 @@ public void testInvalidFetchRequest() throws Exception {
         context.assertSentFetchPartitionResponse(Errors.INVALID_REQUEST, 
epoch, OptionalInt.of(localId));
     }
 
+    // This test mainly focuses on whether the leader state is correctly 
updated under different fetch version.
+    @ParameterizedTest
+    @ApiKeyVersionsSource(apiKey = ApiKeys.FETCH)
+    public void testLeaderStateUpdateWithDifferentFetchRequestVersions(short 
version) throws Exception {
+        int localId = 0;
+        int otherNodeId = 1;
+        int epoch = 5;
+        Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
+
+        RaftClientTestContext context = 
RaftClientTestContext.initializeAsLeader(localId, voters, epoch);
+
+        // First, test with a correct fetch request.
+        FetchRequestData fetchRequestData = context.fetchRequest(epoch, 
otherNodeId, 1L, epoch, 0);
+        FetchRequestData downgradedRequest = new 
FetchRequest.SimpleBuilder(fetchRequestData).build(version).data();
+        context.deliverRequest(downgradedRequest);

Review Comment:
   nit: Could we assert the relevant replica id based on the version before 
sending the request?



##########
raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java:
##########
@@ -1436,6 +1441,47 @@ public void testInvalidFetchRequest() throws Exception {
         context.assertSentFetchPartitionResponse(Errors.INVALID_REQUEST, 
epoch, OptionalInt.of(localId));
     }
 
+    // This test mainly focuses on whether the leader state is correctly 
updated under different fetch version.
+    @ParameterizedTest
+    @ApiKeyVersionsSource(apiKey = ApiKeys.FETCH)
+    public void testLeaderStateUpdateWithDifferentFetchRequestVersions(short 
version) throws Exception {
+        int localId = 0;
+        int otherNodeId = 1;
+        int epoch = 5;
+        Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
+
+        RaftClientTestContext context = 
RaftClientTestContext.initializeAsLeader(localId, voters, epoch);
+

Review Comment:
   nit: Could we assert the HWM here? We could perhaps add the following:
   
   ```
           // First poll has no high watermark advance
           context.client.poll();
           assertEquals(OptionalLong.empty(), context.client.highWatermark());
           assertEquals(1L, context.log.endOffset().offset);
   ```



##########
clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java:
##########
@@ -130,10 +131,35 @@ private static Optional<Integer> optionalEpoch(int 
rawEpochValue) {
         }
     }
 
+    // It is only used by KafkaRaftClient for downgrading the FetchRequest. 
Notice that, it will throw
+    // UnsupportedOperationException if it is used for upgrading.
+    public static class SimpleBuilder extends 
AbstractRequest.Builder<FetchRequest> {
+        private final FetchRequestData fetchRequestData;
+        public SimpleBuilder(FetchRequestData fetchRequestData) {
+            super(ApiKeys.FETCH);
+            this.fetchRequestData = fetchRequestData;
+        }
+
+        @Override
+        public FetchRequest build(short version) {
+            if (fetchRequestData.replicaId() >= 0) {
+                throw new UnsupportedOperationException();

Review Comment:
   nit: I would actually use IllegalStateException and we should also put a 
meaningful error message.



##########
clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java:
##########
@@ -130,10 +131,35 @@ private static Optional<Integer> optionalEpoch(int 
rawEpochValue) {
         }
     }
 
+    // It is only used by KafkaRaftClient for downgrading the FetchRequest. 
Notice that, it will throw
+    // UnsupportedOperationException if it is used for upgrading.
+    public static class SimpleBuilder extends 
AbstractRequest.Builder<FetchRequest> {
+        private final FetchRequestData fetchRequestData;
+        public SimpleBuilder(FetchRequestData fetchRequestData) {
+            super(ApiKeys.FETCH);
+            this.fetchRequestData = fetchRequestData;
+        }
+
+        @Override
+        public FetchRequest build(short version) {
+            if (fetchRequestData.replicaId() >= 0) {
+                throw new UnsupportedOperationException();
+            }
+
+            int replicaId = FetchRequest.replicaId(fetchRequestData);
+            if (version < 15) {
+                fetchRequestData.setReplicaId(replicaId);

Review Comment:
   nit: I would rather use `fetchRequestData.replicaState().replicaId()` 
directly because we don't need to check the value of `replicaId`.



##########
core/src/test/scala/kafka/server/RemoteLeaderEndPointTest.scala:
##########
@@ -116,4 +122,24 @@ class RemoteLeaderEndPointTest {
         assertThrows(classOf[UnknownLeaderEpochException], () => 
endPoint.fetchEarliestOffset(topicPartition, currentLeaderEpoch + 1))
         assertThrows(classOf[UnknownLeaderEpochException], () => 
endPoint.fetchLatestOffset(topicPartition, currentLeaderEpoch + 1))
     }
+
+    @ParameterizedTest
+    @ApiKeyVersionsSource(apiKey = ApiKeys.FETCH)
+    def testBrokerEpochSupplier(version: Short): Unit = {
+        val tp = new TopicPartition("topic1", 0)
+        val topicId1 = Uuid.randomUuid()
+        val log = mock(classOf[UnifiedLog])
+        val partitionMap = Map(
+            tp -> PartitionFetchState(Some(topicId1), 150, None, 0, None, 
state = Fetching, lastFetchedEpoch = None))
+        when(replicaManager.localLogOrException(tp)).thenReturn(log)
+        when(log.logStartOffset).thenReturn(1)
+        val ResultWithPartitions(fetchRequestOpt, partitionsWithError) = 
endPoint.buildFetch(partitionMap)

Review Comment:
   nit: I would add an empty line before this one as you added one before the 
second block.



##########
clients/src/test/java/org/apache/kafka/common/requests/FetchRequestTest.java:
##########
@@ -198,6 +204,35 @@ public void testForgottenTopics(short version) {
         }
     }
 
+    @ParameterizedTest
+    @ApiKeyVersionsSource(apiKey = ApiKeys.FETCH)
+    public void testFetchRequestSimpleBuilderDowngrade(short version) {
+        FetchRequestData fetchRequestData = new FetchRequestData();
+        fetchRequestData.setReplicaState(new 
FetchRequestData.ReplicaState().setReplicaId(1));
+        FetchRequest.SimpleBuilder builder = new 
FetchRequest.SimpleBuilder(fetchRequestData);
+        fetchRequestData = builder.build(version).data();
+
+        assertEquals(1, FetchRequest.replicaId(fetchRequestData));
+
+        if (version < 15) {
+            assertEquals(1, fetchRequestData.replicaId());
+            assertEquals(-1, fetchRequestData.replicaState().replicaId());
+        } else {
+            assertEquals(-1, fetchRequestData.replicaId());
+            assertEquals(1, fetchRequestData.replicaState().replicaId());
+        }
+    }
+
+    @ParameterizedTest
+    @ApiKeyVersionsSource(apiKey = ApiKeys.FETCH)
+    public void testFetchRequestSimpleBuilderUpgrade(short version) {

Review Comment:
   nit: `testFetchRequestSimpleBuilderReplicaIdNotSupported`?



##########
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##########
@@ -958,7 +959,11 @@ private CompletableFuture<FetchResponseData> 
handleFetchRequest(
                 Errors.INVALID_REQUEST, Optional.empty()));
         }
 
-        FetchResponseData response = 
tryCompleteFetchRequest(request.replicaId(), fetchPartition, currentTimeMs);
+        int replicaId = FetchRequest.replicaId(request);
+        FetchResponseData response = tryCompleteFetchRequest(
+            replicaId,
+            fetchPartition,
+            currentTimeMs);

Review Comment:
   nit: Let's keep the original format.



##########
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##########
@@ -983,16 +988,16 @@ private CompletableFuture<FetchResponseData> 
handleFetchRequest(
                 Errors error = Errors.forException(cause);
                 if (error != Errors.REQUEST_TIMED_OUT) {
                     logger.debug("Failed to handle fetch from {} at {} due to 
{}",
-                        request.replicaId(), fetchPartition.fetchOffset(), 
error);
+                            replicaId, fetchPartition.fetchOffset(), error);
                     return buildEmptyFetchResponse(error, Optional.empty());
                 }
             }
 
             // FIXME: `completionTimeMs`, which can be null
             logger.trace("Completing delayed fetch from {} starting at offset 
{} at {}",
-                request.replicaId(), fetchPartition.fetchOffset(), 
completionTimeMs);
+                    replicaId, fetchPartition.fetchOffset(), completionTimeMs);

Review Comment:
   nit: Let's keep the original indentation please.



##########
raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java:
##########
@@ -1436,6 +1441,47 @@ public void testInvalidFetchRequest() throws Exception {
         context.assertSentFetchPartitionResponse(Errors.INVALID_REQUEST, 
epoch, OptionalInt.of(localId));
     }
 
+    // This test mainly focuses on whether the leader state is correctly 
updated under different fetch version.
+    @ParameterizedTest
+    @ApiKeyVersionsSource(apiKey = ApiKeys.FETCH)
+    public void testLeaderStateUpdateWithDifferentFetchRequestVersions(short 
version) throws Exception {
+        int localId = 0;
+        int otherNodeId = 1;
+        int epoch = 5;
+        Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
+
+        RaftClientTestContext context = 
RaftClientTestContext.initializeAsLeader(localId, voters, epoch);
+
+        // First, test with a correct fetch request.
+        FetchRequestData fetchRequestData = context.fetchRequest(epoch, 
otherNodeId, 1L, epoch, 0);
+        FetchRequestData downgradedRequest = new 
FetchRequest.SimpleBuilder(fetchRequestData).build(version).data();
+        context.deliverRequest(downgradedRequest);
+        context.client.poll();
+        context.assertSentFetchPartitionResponse(Errors.NONE, epoch, 
OptionalInt.of(localId));
+        assertEquals(1L, context.log.highWatermark().offset);
+
+        // Next, create a fetch request that is designed to be divergent.

Review Comment:
   I am not entirely convinced by the second part of the test. Basically, it 
relies on the fact that a fetch request with a diverging offset goes into the 
purgatory and is completed when the offset if finally available. I actually 
wonder if this is a bug because I would have expected the fetch request to be 
competed immediately in this case.
   
   I can't really think of a good way to test the second part so I would just 
remove it.



##########
raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java:
##########
@@ -1436,6 +1441,47 @@ public void testInvalidFetchRequest() throws Exception {
         context.assertSentFetchPartitionResponse(Errors.INVALID_REQUEST, 
epoch, OptionalInt.of(localId));
     }
 
+    // This test mainly focuses on whether the leader state is correctly 
updated under different fetch version.
+    @ParameterizedTest
+    @ApiKeyVersionsSource(apiKey = ApiKeys.FETCH)
+    public void testLeaderStateUpdateWithDifferentFetchRequestVersions(short 
version) throws Exception {
+        int localId = 0;
+        int otherNodeId = 1;
+        int epoch = 5;
+        Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
+
+        RaftClientTestContext context = 
RaftClientTestContext.initializeAsLeader(localId, voters, epoch);
+
+        // First, test with a correct fetch request.
+        FetchRequestData fetchRequestData = context.fetchRequest(epoch, 
otherNodeId, 1L, epoch, 0);
+        FetchRequestData downgradedRequest = new 
FetchRequest.SimpleBuilder(fetchRequestData).build(version).data();

Review Comment:
   nit: `request` because it is not always downgraded?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to