dajac commented on code in PR #13323: URL: https://github.com/apache/kafka/pull/13323#discussion_r1136820680
########## clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java: ########## @@ -130,10 +131,35 @@ private static Optional<Integer> optionalEpoch(int rawEpochValue) { } } + // It is only used by KafkaRaftClient for downgrading the FetchRequest. Notice that, it will throw + // UnsupportedOperationException if it is used for upgrading. + public static class SimpleBuilder extends AbstractRequest.Builder<FetchRequest> { + private final FetchRequestData fetchRequestData; + public SimpleBuilder(FetchRequestData fetchRequestData) { Review Comment: nit: Let's add an empty line before this one. ########## clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java: ########## @@ -130,10 +131,35 @@ private static Optional<Integer> optionalEpoch(int rawEpochValue) { } } + // It is only used by KafkaRaftClient for downgrading the FetchRequest. Notice that, it will throw + // UnsupportedOperationException if it is used for upgrading. Review Comment: nit: I would remove the second sentence because I find it unclear. What does `upgrading` mean here? ########## clients/src/test/java/org/apache/kafka/common/requests/FetchRequestTest.java: ########## @@ -198,6 +204,35 @@ public void testForgottenTopics(short version) { } } + @ParameterizedTest + @ApiKeyVersionsSource(apiKey = ApiKeys.FETCH) + public void testFetchRequestSimpleBuilderDowngrade(short version) { Review Comment: nit: `...FetchStateDowngrade`? ########## core/src/test/scala/unit/kafka/raft/KafkaNetworkChannelTest.scala: ########## @@ -159,6 +162,28 @@ class KafkaNetworkChannelTest { } } + @ParameterizedTest + @ApiKeyVersionsSource(apiKey = ApiKeys.FETCH) + def testFetchRequestDowngrade(version: Short): Unit = { + val destinationId = 2 + val destinationNode = new Node(destinationId, "127.0.0.1", 9092) + channel.updateEndpoint(destinationId, new InetAddressSpec( + new InetSocketAddress(destinationNode.host, destinationNode.port))) + sendTestRequest(ApiKeys.FETCH, destinationId) + channel.pollOnce() + + assertEquals(1, client.requests().size()) + val request = client.requests().peek().requestBuilder().build(version) + + if (version < 15) { + assertTrue(request.asInstanceOf[FetchRequest].data().replicaId() == 1) Review Comment: small nit: In Scala, we usually don't put the `()` for accessors. In this test, you can remove them for `data`, `replicaId`, `size`, `replicaState`, etc. ########## raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java: ########## @@ -983,16 +988,16 @@ private CompletableFuture<FetchResponseData> handleFetchRequest( Errors error = Errors.forException(cause); if (error != Errors.REQUEST_TIMED_OUT) { logger.debug("Failed to handle fetch from {} at {} due to {}", - request.replicaId(), fetchPartition.fetchOffset(), error); + replicaId, fetchPartition.fetchOffset(), error); Review Comment: nit: Let's keep the original indentation please. ########## raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java: ########## @@ -1436,6 +1441,47 @@ public void testInvalidFetchRequest() throws Exception { context.assertSentFetchPartitionResponse(Errors.INVALID_REQUEST, epoch, OptionalInt.of(localId)); } + // This test mainly focuses on whether the leader state is correctly updated under different fetch version. + @ParameterizedTest + @ApiKeyVersionsSource(apiKey = ApiKeys.FETCH) + public void testLeaderStateUpdateWithDifferentFetchRequestVersions(short version) throws Exception { + int localId = 0; + int otherNodeId = 1; + int epoch = 5; + Set<Integer> voters = Utils.mkSet(localId, otherNodeId); + + RaftClientTestContext context = RaftClientTestContext.initializeAsLeader(localId, voters, epoch); + + // First, test with a correct fetch request. + FetchRequestData fetchRequestData = context.fetchRequest(epoch, otherNodeId, 1L, epoch, 0); + FetchRequestData downgradedRequest = new FetchRequest.SimpleBuilder(fetchRequestData).build(version).data(); + context.deliverRequest(downgradedRequest); + context.client.poll(); + context.assertSentFetchPartitionResponse(Errors.NONE, epoch, OptionalInt.of(localId)); + assertEquals(1L, context.log.highWatermark().offset); Review Comment: nit: Other tests use `context.client.highWatermark()`. Should we also use this one? ########## raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java: ########## @@ -1436,6 +1441,47 @@ public void testInvalidFetchRequest() throws Exception { context.assertSentFetchPartitionResponse(Errors.INVALID_REQUEST, epoch, OptionalInt.of(localId)); } + // This test mainly focuses on whether the leader state is correctly updated under different fetch version. + @ParameterizedTest + @ApiKeyVersionsSource(apiKey = ApiKeys.FETCH) + public void testLeaderStateUpdateWithDifferentFetchRequestVersions(short version) throws Exception { + int localId = 0; + int otherNodeId = 1; + int epoch = 5; + Set<Integer> voters = Utils.mkSet(localId, otherNodeId); + + RaftClientTestContext context = RaftClientTestContext.initializeAsLeader(localId, voters, epoch); + + // First, test with a correct fetch request. + FetchRequestData fetchRequestData = context.fetchRequest(epoch, otherNodeId, 1L, epoch, 0); + FetchRequestData downgradedRequest = new FetchRequest.SimpleBuilder(fetchRequestData).build(version).data(); + context.deliverRequest(downgradedRequest); + context.client.poll(); Review Comment: nit: context.pollUntilResponse() seems better here. ########## raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java: ########## @@ -1436,6 +1441,47 @@ public void testInvalidFetchRequest() throws Exception { context.assertSentFetchPartitionResponse(Errors.INVALID_REQUEST, epoch, OptionalInt.of(localId)); } + // This test mainly focuses on whether the leader state is correctly updated under different fetch version. + @ParameterizedTest + @ApiKeyVersionsSource(apiKey = ApiKeys.FETCH) + public void testLeaderStateUpdateWithDifferentFetchRequestVersions(short version) throws Exception { + int localId = 0; + int otherNodeId = 1; + int epoch = 5; + Set<Integer> voters = Utils.mkSet(localId, otherNodeId); + + RaftClientTestContext context = RaftClientTestContext.initializeAsLeader(localId, voters, epoch); + + // First, test with a correct fetch request. + FetchRequestData fetchRequestData = context.fetchRequest(epoch, otherNodeId, 1L, epoch, 0); + FetchRequestData downgradedRequest = new FetchRequest.SimpleBuilder(fetchRequestData).build(version).data(); + context.deliverRequest(downgradedRequest); Review Comment: nit: Could we assert the relevant replica id based on the version before sending the request? ########## raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java: ########## @@ -1436,6 +1441,47 @@ public void testInvalidFetchRequest() throws Exception { context.assertSentFetchPartitionResponse(Errors.INVALID_REQUEST, epoch, OptionalInt.of(localId)); } + // This test mainly focuses on whether the leader state is correctly updated under different fetch version. + @ParameterizedTest + @ApiKeyVersionsSource(apiKey = ApiKeys.FETCH) + public void testLeaderStateUpdateWithDifferentFetchRequestVersions(short version) throws Exception { + int localId = 0; + int otherNodeId = 1; + int epoch = 5; + Set<Integer> voters = Utils.mkSet(localId, otherNodeId); + + RaftClientTestContext context = RaftClientTestContext.initializeAsLeader(localId, voters, epoch); + Review Comment: nit: Could we assert the HWM here? We could perhaps add the following: ``` // First poll has no high watermark advance context.client.poll(); assertEquals(OptionalLong.empty(), context.client.highWatermark()); assertEquals(1L, context.log.endOffset().offset); ``` ########## clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java: ########## @@ -130,10 +131,35 @@ private static Optional<Integer> optionalEpoch(int rawEpochValue) { } } + // It is only used by KafkaRaftClient for downgrading the FetchRequest. Notice that, it will throw + // UnsupportedOperationException if it is used for upgrading. + public static class SimpleBuilder extends AbstractRequest.Builder<FetchRequest> { + private final FetchRequestData fetchRequestData; + public SimpleBuilder(FetchRequestData fetchRequestData) { + super(ApiKeys.FETCH); + this.fetchRequestData = fetchRequestData; + } + + @Override + public FetchRequest build(short version) { + if (fetchRequestData.replicaId() >= 0) { + throw new UnsupportedOperationException(); Review Comment: nit: I would actually use IllegalStateException and we should also put a meaningful error message. ########## clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java: ########## @@ -130,10 +131,35 @@ private static Optional<Integer> optionalEpoch(int rawEpochValue) { } } + // It is only used by KafkaRaftClient for downgrading the FetchRequest. Notice that, it will throw + // UnsupportedOperationException if it is used for upgrading. + public static class SimpleBuilder extends AbstractRequest.Builder<FetchRequest> { + private final FetchRequestData fetchRequestData; + public SimpleBuilder(FetchRequestData fetchRequestData) { + super(ApiKeys.FETCH); + this.fetchRequestData = fetchRequestData; + } + + @Override + public FetchRequest build(short version) { + if (fetchRequestData.replicaId() >= 0) { + throw new UnsupportedOperationException(); + } + + int replicaId = FetchRequest.replicaId(fetchRequestData); + if (version < 15) { + fetchRequestData.setReplicaId(replicaId); Review Comment: nit: I would rather use `fetchRequestData.replicaState().replicaId()` directly because we don't need to check the value of `replicaId`. ########## core/src/test/scala/kafka/server/RemoteLeaderEndPointTest.scala: ########## @@ -116,4 +122,24 @@ class RemoteLeaderEndPointTest { assertThrows(classOf[UnknownLeaderEpochException], () => endPoint.fetchEarliestOffset(topicPartition, currentLeaderEpoch + 1)) assertThrows(classOf[UnknownLeaderEpochException], () => endPoint.fetchLatestOffset(topicPartition, currentLeaderEpoch + 1)) } + + @ParameterizedTest + @ApiKeyVersionsSource(apiKey = ApiKeys.FETCH) + def testBrokerEpochSupplier(version: Short): Unit = { + val tp = new TopicPartition("topic1", 0) + val topicId1 = Uuid.randomUuid() + val log = mock(classOf[UnifiedLog]) + val partitionMap = Map( + tp -> PartitionFetchState(Some(topicId1), 150, None, 0, None, state = Fetching, lastFetchedEpoch = None)) + when(replicaManager.localLogOrException(tp)).thenReturn(log) + when(log.logStartOffset).thenReturn(1) + val ResultWithPartitions(fetchRequestOpt, partitionsWithError) = endPoint.buildFetch(partitionMap) Review Comment: nit: I would add an empty line before this one as you added one before the second block. ########## clients/src/test/java/org/apache/kafka/common/requests/FetchRequestTest.java: ########## @@ -198,6 +204,35 @@ public void testForgottenTopics(short version) { } } + @ParameterizedTest + @ApiKeyVersionsSource(apiKey = ApiKeys.FETCH) + public void testFetchRequestSimpleBuilderDowngrade(short version) { + FetchRequestData fetchRequestData = new FetchRequestData(); + fetchRequestData.setReplicaState(new FetchRequestData.ReplicaState().setReplicaId(1)); + FetchRequest.SimpleBuilder builder = new FetchRequest.SimpleBuilder(fetchRequestData); + fetchRequestData = builder.build(version).data(); + + assertEquals(1, FetchRequest.replicaId(fetchRequestData)); + + if (version < 15) { + assertEquals(1, fetchRequestData.replicaId()); + assertEquals(-1, fetchRequestData.replicaState().replicaId()); + } else { + assertEquals(-1, fetchRequestData.replicaId()); + assertEquals(1, fetchRequestData.replicaState().replicaId()); + } + } + + @ParameterizedTest + @ApiKeyVersionsSource(apiKey = ApiKeys.FETCH) + public void testFetchRequestSimpleBuilderUpgrade(short version) { Review Comment: nit: `testFetchRequestSimpleBuilderReplicaIdNotSupported`? ########## raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java: ########## @@ -958,7 +959,11 @@ private CompletableFuture<FetchResponseData> handleFetchRequest( Errors.INVALID_REQUEST, Optional.empty())); } - FetchResponseData response = tryCompleteFetchRequest(request.replicaId(), fetchPartition, currentTimeMs); + int replicaId = FetchRequest.replicaId(request); + FetchResponseData response = tryCompleteFetchRequest( + replicaId, + fetchPartition, + currentTimeMs); Review Comment: nit: Let's keep the original format. ########## raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java: ########## @@ -983,16 +988,16 @@ private CompletableFuture<FetchResponseData> handleFetchRequest( Errors error = Errors.forException(cause); if (error != Errors.REQUEST_TIMED_OUT) { logger.debug("Failed to handle fetch from {} at {} due to {}", - request.replicaId(), fetchPartition.fetchOffset(), error); + replicaId, fetchPartition.fetchOffset(), error); return buildEmptyFetchResponse(error, Optional.empty()); } } // FIXME: `completionTimeMs`, which can be null logger.trace("Completing delayed fetch from {} starting at offset {} at {}", - request.replicaId(), fetchPartition.fetchOffset(), completionTimeMs); + replicaId, fetchPartition.fetchOffset(), completionTimeMs); Review Comment: nit: Let's keep the original indentation please. ########## raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java: ########## @@ -1436,6 +1441,47 @@ public void testInvalidFetchRequest() throws Exception { context.assertSentFetchPartitionResponse(Errors.INVALID_REQUEST, epoch, OptionalInt.of(localId)); } + // This test mainly focuses on whether the leader state is correctly updated under different fetch version. + @ParameterizedTest + @ApiKeyVersionsSource(apiKey = ApiKeys.FETCH) + public void testLeaderStateUpdateWithDifferentFetchRequestVersions(short version) throws Exception { + int localId = 0; + int otherNodeId = 1; + int epoch = 5; + Set<Integer> voters = Utils.mkSet(localId, otherNodeId); + + RaftClientTestContext context = RaftClientTestContext.initializeAsLeader(localId, voters, epoch); + + // First, test with a correct fetch request. + FetchRequestData fetchRequestData = context.fetchRequest(epoch, otherNodeId, 1L, epoch, 0); + FetchRequestData downgradedRequest = new FetchRequest.SimpleBuilder(fetchRequestData).build(version).data(); + context.deliverRequest(downgradedRequest); + context.client.poll(); + context.assertSentFetchPartitionResponse(Errors.NONE, epoch, OptionalInt.of(localId)); + assertEquals(1L, context.log.highWatermark().offset); + + // Next, create a fetch request that is designed to be divergent. Review Comment: I am not entirely convinced by the second part of the test. Basically, it relies on the fact that a fetch request with a diverging offset goes into the purgatory and is completed when the offset if finally available. I actually wonder if this is a bug because I would have expected the fetch request to be competed immediately in this case. I can't really think of a good way to test the second part so I would just remove it. ########## raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java: ########## @@ -1436,6 +1441,47 @@ public void testInvalidFetchRequest() throws Exception { context.assertSentFetchPartitionResponse(Errors.INVALID_REQUEST, epoch, OptionalInt.of(localId)); } + // This test mainly focuses on whether the leader state is correctly updated under different fetch version. + @ParameterizedTest + @ApiKeyVersionsSource(apiKey = ApiKeys.FETCH) + public void testLeaderStateUpdateWithDifferentFetchRequestVersions(short version) throws Exception { + int localId = 0; + int otherNodeId = 1; + int epoch = 5; + Set<Integer> voters = Utils.mkSet(localId, otherNodeId); + + RaftClientTestContext context = RaftClientTestContext.initializeAsLeader(localId, voters, epoch); + + // First, test with a correct fetch request. + FetchRequestData fetchRequestData = context.fetchRequest(epoch, otherNodeId, 1L, epoch, 0); + FetchRequestData downgradedRequest = new FetchRequest.SimpleBuilder(fetchRequestData).build(version).data(); Review Comment: nit: `request` because it is not always downgraded? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org