[GitHub] [kafka] dajac commented on a diff in pull request #13323: KAFKA-14617: Add ReplicaState to FetchRequest
dajac commented on code in PR #13323: URL: https://github.com/apache/kafka/pull/13323#discussion_r1136820680 ## clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java: ## @@ -130,10 +131,35 @@ private static Optional optionalEpoch(int rawEpochValue) { } } +// It is only used by KafkaRaftClient for downgrading the FetchRequest. Notice that, it will throw +// UnsupportedOperationException if it is used for upgrading. +public static class SimpleBuilder extends AbstractRequest.Builder { +private final FetchRequestData fetchRequestData; +public SimpleBuilder(FetchRequestData fetchRequestData) { Review Comment: nit: Let's add an empty line before this one. ## clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java: ## @@ -130,10 +131,35 @@ private static Optional optionalEpoch(int rawEpochValue) { } } +// It is only used by KafkaRaftClient for downgrading the FetchRequest. Notice that, it will throw +// UnsupportedOperationException if it is used for upgrading. Review Comment: nit: I would remove the second sentence because I find it unclear. What does `upgrading` mean here? ## clients/src/test/java/org/apache/kafka/common/requests/FetchRequestTest.java: ## @@ -198,6 +204,35 @@ public void testForgottenTopics(short version) { } } +@ParameterizedTest +@ApiKeyVersionsSource(apiKey = ApiKeys.FETCH) +public void testFetchRequestSimpleBuilderDowngrade(short version) { Review Comment: nit: `...FetchStateDowngrade`? ## core/src/test/scala/unit/kafka/raft/KafkaNetworkChannelTest.scala: ## @@ -159,6 +162,28 @@ class KafkaNetworkChannelTest { } } + @ParameterizedTest + @ApiKeyVersionsSource(apiKey = ApiKeys.FETCH) + def testFetchRequestDowngrade(version: Short): Unit = { +val destinationId = 2 +val destinationNode = new Node(destinationId, "127.0.0.1", 9092) +channel.updateEndpoint(destinationId, new InetAddressSpec( + new InetSocketAddress(destinationNode.host, destinationNode.port))) +sendTestRequest(ApiKeys.FETCH, destinationId) +channel.pollOnce() + +assertEquals(1, client.requests().size()) +val request = client.requests().peek().requestBuilder().build(version) + +if (version < 15) { + assertTrue(request.asInstanceOf[FetchRequest].data().replicaId() == 1) Review Comment: small nit: In Scala, we usually don't put the `()` for accessors. In this test, you can remove them for `data`, `replicaId`, `size`, `replicaState`, etc. ## raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java: ## @@ -983,16 +988,16 @@ private CompletableFuture handleFetchRequest( Errors error = Errors.forException(cause); if (error != Errors.REQUEST_TIMED_OUT) { logger.debug("Failed to handle fetch from {} at {} due to {}", -request.replicaId(), fetchPartition.fetchOffset(), error); +replicaId, fetchPartition.fetchOffset(), error); Review Comment: nit: Let's keep the original indentation please. ## raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java: ## @@ -1436,6 +1441,47 @@ public void testInvalidFetchRequest() throws Exception { context.assertSentFetchPartitionResponse(Errors.INVALID_REQUEST, epoch, OptionalInt.of(localId)); } +// This test mainly focuses on whether the leader state is correctly updated under different fetch version. +@ParameterizedTest +@ApiKeyVersionsSource(apiKey = ApiKeys.FETCH) +public void testLeaderStateUpdateWithDifferentFetchRequestVersions(short version) throws Exception { +int localId = 0; +int otherNodeId = 1; +int epoch = 5; +Set voters = Utils.mkSet(localId, otherNodeId); + +RaftClientTestContext context = RaftClientTestContext.initializeAsLeader(localId, voters, epoch); + +// First, test with a correct fetch request. +FetchRequestData fetchRequestData = context.fetchRequest(epoch, otherNodeId, 1L, epoch, 0); +FetchRequestData downgradedRequest = new FetchRequest.SimpleBuilder(fetchRequestData).build(version).data(); +context.deliverRequest(downgradedRequest); +context.client.poll(); +context.assertSentFetchPartitionResponse(Errors.NONE, epoch, OptionalInt.of(localId)); +assertEquals(1L, context.log.highWatermark().offset); Review Comment: nit: Other tests use `context.client.highWatermark()`. Should we also use this one? ## raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java: ## @@ -1436,6 +1441,47 @@ public void testInvalidFetchRequest() throws Exception { context.assertSentFetchPartitionResponse(Errors.INVALID_REQUEST, epoch, OptionalInt.of(localId)); } +// This test
[GitHub] [kafka] dajac commented on a diff in pull request #13323: KAFKA-14617: Add ReplicaState to FetchRequest
dajac commented on code in PR #13323: URL: https://github.com/apache/kafka/pull/13323#discussion_r1135754940 ## clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java: ## @@ -130,10 +131,33 @@ private static Optional optionalEpoch(int rawEpochValue) { } } +public static class SimpleBuilder extends AbstractRequest.Builder { +private final FetchRequestData fetchRequestData; +public SimpleBuilder(FetchRequestData fetchRequestData) { +super(ApiKeys.FETCH); +this.fetchRequestData = fetchRequestData; +} + +@Override +public FetchRequest build(short version) { +int replicaId = FetchRequest.replicaId(fetchRequestData); +long replicaEpoch = fetchRequestData.replicaState().replicaEpoch(); +if (version < 15) { +fetchRequestData.setReplicaId(replicaId); +fetchRequestData.setReplicaState(new ReplicaState()); +} else { +fetchRequestData.setReplicaState(new ReplicaState().setReplicaId(replicaId).setReplicaEpoch(replicaEpoch)); +fetchRequestData.setReplicaId(-1); +} Review Comment: My understanding is that we always use the new format everywhere so we should only care about downgrading, no? If we get a replica id >= 0, we could even consider throwing an UnsupportedVersionException for instance. ## core/src/main/scala/kafka/raft/KafkaNetworkChannel.scala: ## @@ -44,7 +45,10 @@ object KafkaNetworkChannel { case fetchRequest: FetchRequestData => // Since we already have the request, we go through a simplified builder new AbstractRequest.Builder[FetchRequest](ApiKeys.FETCH) { - override def build(version: Short): FetchRequest = new FetchRequest(fetchRequest, version) + override def build(version: Short): FetchRequest = { +val builder = new SimpleBuilder(fetchRequest) +new FetchRequest(builder.build(version).data(), version) + } override def toString: String = fetchRequest.toString } Review Comment: You can replace all of this by `new FetchRequest.SimpleBuilder(fetchRequest)`. ## clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java: ## @@ -130,10 +131,33 @@ private static Optional optionalEpoch(int rawEpochValue) { } } +public static class SimpleBuilder extends AbstractRequest.Builder { Review Comment: nit: Could we put a comment saying that this is only used by the KafkaRaftClient? ## raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java: ## @@ -983,16 +987,16 @@ private CompletableFuture handleFetchRequest( Errors error = Errors.forException(cause); if (error != Errors.REQUEST_TIMED_OUT) { logger.debug("Failed to handle fetch from {} at {} due to {}", -request.replicaId(), fetchPartition.fetchOffset(), error); +FetchRequest.replicaId(request), fetchPartition.fetchOffset(), error); return buildEmptyFetchResponse(error, Optional.empty()); } } // FIXME: `completionTimeMs`, which can be null logger.trace("Completing delayed fetch from {} starting at offset {} at {}", -request.replicaId(), fetchPartition.fetchOffset(), completionTimeMs); +FetchRequest.replicaId(request), fetchPartition.fetchOffset(), completionTimeMs); -return tryCompleteFetchRequest(request.replicaId(), fetchPartition, time.milliseconds()); +return tryCompleteFetchRequest(FetchRequest.replicaId(request), fetchPartition, time.milliseconds()); Review Comment: nit: Would it make sense to pull `FetchRequest.replicaId(request)` into a variable instead of calling it everywhere? ## core/src/test/scala/kafka/server/RemoteLeaderEndPointTest.scala: ## @@ -116,4 +119,18 @@ class RemoteLeaderEndPointTest { assertThrows(classOf[UnknownLeaderEpochException], () => endPoint.fetchEarliestOffset(topicPartition, currentLeaderEpoch + 1)) assertThrows(classOf[UnknownLeaderEpochException], () => endPoint.fetchLatestOffset(topicPartition, currentLeaderEpoch + 1)) } + +@Test +def testBrokerEpochSupplier(): Unit = { +val tp = new TopicPartition("topic1", 0) +val topicId1 = Uuid.randomUuid() +val log: UnifiedLog = mock(classOf[UnifiedLog]) +val partitionMap = Map( +tp -> PartitionFetchState(Some(topicId1), 150, None, 0, None, state = Fetching, lastFetchedEpoch = None)) +when(replicaManager.localLogOrException(tp)).thenReturn(log) +when(log.logStartOffset).thenReturn(1) +val ResultWithPartitions(fetchRequestOpt, partitionsWithError) =
[GitHub] [kafka] dajac commented on a diff in pull request #13323: KAFKA-14617 Add ReplicaState to FetchRequest.
dajac commented on code in PR #13323: URL: https://github.com/apache/kafka/pull/13323#discussion_r1132627550 ## clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java: ## @@ -302,6 +320,19 @@ public String toString() { } } +// Downgrades the ReplicaState field to be compatible with lower version. +public static void maybeDownGradeReplicaState(FetchRequestData fetchRequestData, short version) { +if (version < 15) { + fetchRequestData.setReplicaId(fetchRequestData.replicaState().replicaId()); +fetchRequestData.setReplicaState(new ReplicaState()); +} +} + +public static int replicaId(FetchRequestData fetchRequestData) { +return fetchRequestData.replicaId() != -1 ? +fetchRequestData.replicaId() : fetchRequestData.replicaState().replicaId(); Review Comment: nit: Should we keep it on a single line? ## clients/src/test/java/org/apache/kafka/common/requests/FetchRequestTest.java: ## @@ -198,6 +202,24 @@ public void testForgottenTopics(short version) { } } +@ParameterizedTest +@MethodSource("fetchVersions") Review Comment: nit: You can use @ApiKeysVersionSource here. ## core/src/main/scala/kafka/raft/KafkaNetworkChannel.scala: ## @@ -105,14 +108,18 @@ class KafkaNetworkChannel( private val correlationIdCounter = new AtomicInteger(0) private val endpoints = mutable.HashMap.empty[Int, Node] - private val requestThread = new RaftSendThread( + private var requestThread = new RaftSendThread( name = threadNamePrefix + "-outbound-request-thread", networkClient = client, requestTimeoutMs = requestTimeoutMs, time = time, isInterruptible = false ) + def setRequestThread(raftSendThread: RaftSendThread): Unit = { +requestThread = raftSendThread + } Review Comment: Hum.. I would rather prefer to avoid doing this. In the test, it seems that you could rely on the mocked client instead of doing this. Could you please give it a try? ## core/src/test/scala/kafka/server/RemoteLeaderEndPointTest.scala: ## @@ -58,7 +58,7 @@ class RemoteLeaderEndPointTest { blockingSend = new MockBlockingSender(offsets = new util.HashMap[TopicPartition, EpochEndOffset](), sourceBroker = sourceBroker, time = time) endPoint = new RemoteLeaderEndPoint(logPrefix, blockingSend, fetchSessionHandler, -config, replicaManager, QuotaFactory.UnboundedQuota, () => MetadataVersion.MINIMUM_KRAFT_VERSION) +config, replicaManager, QuotaFactory.UnboundedQuota, () => MetadataVersion.MINIMUM_KRAFT_VERSION, () => 1) Review Comment: nit: Should we add a small unit test in this file to ensure that the value is propagated correctly? ## core/src/test/scala/unit/kafka/raft/KafkaNetworkChannelTest.scala: ## @@ -159,6 +167,29 @@ class KafkaNetworkChannelTest { } } + @ParameterizedTest + @MethodSource(Array("fetchVersions")) + def testFetchRequestDowngrade(version: Short): Unit = { +val destinationId = 2 +val destinationNode = new Node(destinationId, "127.0.0.1", 9092) +channel.updateEndpoint(destinationId, new InetAddressSpec( + new InetSocketAddress(destinationNode.host, destinationNode.port))) +val mockSendThread = mock(classOf[RaftSendThread]) +channel.setRequestThread(mockSendThread) Review Comment: As said earlier, I would rather prefer to avoid this. ## core/src/test/scala/unit/kafka/server/KafkaApisTest.scala: ## @@ -3366,8 +3366,10 @@ class KafkaApisTest { when(clientQuotaManager.maybeRecordAndGetThrottleTimeMs( any[RequestChannel.Request](), anyDouble, anyLong)).thenReturn(0) -val fetchRequest = new FetchRequest.Builder(9, 9, -1, 100, 0, fetchDataBuilder) +val fetchRequest = new FetchRequest.Builder(9, 9, -1, -1, 100, 0, fetchDataBuilder) .build() +assertEquals(fetchRequest.replicaEpoch(), -1) +assertEquals(fetchRequest.replicaId(), -1) Review Comment: Those assertions are not really useful here because they don't test what the KafkaApis layer does. Let's remove them. ## core/src/test/scala/unit/kafka/raft/KafkaNetworkChannelTest.scala: ## @@ -159,6 +167,29 @@ class KafkaNetworkChannelTest { } } + @ParameterizedTest + @MethodSource(Array("fetchVersions")) Review Comment: nit: You can use `@ApiKeyVersionsSource` as well. ## core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala: ## @@ -208,6 +210,7 @@ class ReplicaFetcherThreadTest { //Assert that truncate to is called exactly once (despite two loops) verify(partition, times(3)).truncateTo(anyLong(), anyBoolean()) +assertTrue(mockBrokerEpochSupplier.getCounter() > 0) Review Comment: It is really weird to add those assertions in random tests. They don't bring much value so I
[GitHub] [kafka] dajac commented on a diff in pull request #13323: KAFKA-14617 Add ReplicaState to FetchRequest.
dajac commented on code in PR #13323: URL: https://github.com/apache/kafka/pull/13323#discussion_r1129840701 ## core/src/main/scala/kafka/raft/KafkaNetworkChannel.scala: ## @@ -44,7 +44,10 @@ object KafkaNetworkChannel { case fetchRequest: FetchRequestData => // Since we already have the request, we go through a simplified builder new AbstractRequest.Builder[FetchRequest](ApiKeys.FETCH) { - override def build(version: Short): FetchRequest = new FetchRequest(fetchRequest, version) + override def build(version: Short): FetchRequest = { Review Comment: Should we add tests for this one as well? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13323: KAFKA-14617 Add ReplicaState to FetchRequest.
dajac commented on code in PR #13323: URL: https://github.com/apache/kafka/pull/13323#discussion_r1129828078 ## clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java: ## @@ -302,6 +320,31 @@ public String toString() { } } +public static void updateReplicaStateBasedOnVersion(FetchRequestData fetchRequestData, short version) { +if (fetchRequestData.replicaId() == fetchRequestData.replicaState().replicaId()) { +// The only case where these two replica ids are the same is that they are both -1. Nothing to update. +return; +} +if (fetchRequestData.replicaId() != -1) { +// Using old replicaId. +if (version >= 15) { +fetchRequestData.setReplicaState(new ReplicaState().setReplicaId(fetchRequestData.replicaId())); +fetchRequestData.setReplicaId(-1); +} +return; +} +// Using replica state +if (version < 15) { + fetchRequestData.setReplicaId(fetchRequestData.replicaState().replicaId()); +fetchRequestData.setReplicaState(new ReplicaState()); +} +} + +public static int getReplicaIdWithoutVersion(FetchRequestData fetchRequestData) { Review Comment: nit: We usually don't prefix getters with `get`. We could just call this one `replicaId`. ## clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java: ## @@ -228,12 +240,18 @@ public FetchRequest build(short version) { } FetchRequestData fetchRequestData = new FetchRequestData(); -fetchRequestData.setReplicaId(replicaId); fetchRequestData.setMaxWaitMs(maxWait); fetchRequestData.setMinBytes(minBytes); fetchRequestData.setMaxBytes(maxBytes); fetchRequestData.setIsolationLevel(isolationLevel.id()); fetchRequestData.setForgottenTopicsData(new ArrayList<>()); +if (version < 15) { +fetchRequestData.setReplicaId(replicaId); +} else { +fetchRequestData.setReplicaState(new ReplicaState() +.setReplicaId(replicaId) +.setReplicaEpoch(replicaEpoch)); Review Comment: nit: Could we use 4 spaces to indent this two lines to be consistent with similar code in this method? ## raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java: ## @@ -974,7 +974,7 @@ void assertFetchRequestData( assertEquals(epoch, fetchPartition.currentLeaderEpoch()); assertEquals(fetchOffset, fetchPartition.fetchOffset()); assertEquals(lastFetchedEpoch, fetchPartition.lastFetchedEpoch()); -assertEquals(localId.orElse(-1), request.replicaId()); +assertEquals(localId.orElse(-1), request.replicaState().replicaId()); Review Comment: Could we extend or add tests to cover the changed in KafkaRaftClient? It would be great if we could test the old and the version of the fetch request to ensure that everything works as expected. ## core/src/test/scala/unit/kafka/server/KafkaApisTest.scala: ## @@ -3290,7 +3290,7 @@ class KafkaApisTest { when(clientQuotaManager.maybeRecordAndGetThrottleTimeMs( any[RequestChannel.Request](), anyDouble, anyLong)).thenReturn(0) -val fetchRequest = new FetchRequest.Builder(9, 9, -1, 100, 0, fetchDataBuilder) +val fetchRequest = new FetchRequest.Builder(9, 9, -1, -1, 100, 0, fetchDataBuilder) Review Comment: Should we extend tests here as well? ## core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala: ## @@ -103,11 +103,12 @@ class ReplicaFetcherThreadTest { failedPartitions: FailedPartitions, replicaMgr: ReplicaManager, quota: ReplicaQuota, - leaderEndpointBlockingSend: BlockingSend): ReplicaFetcherThread = { + leaderEndpointBlockingSend: BlockingSend, Review Comment: Should we also add tests here to test the old and the new version of the fetch request? ## clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java: ## @@ -302,6 +320,31 @@ public String toString() { } } +public static void updateReplicaStateBasedOnVersion(FetchRequestData fetchRequestData, short version) { Review Comment: I wonder if we could simplify this method a bit. My understanding is that we only use it in the raft client where we always set the `ReplicaState` field. This means that we should only care about downgrading here if the version does not support the `ReplicaState` field. I would also name the method `maybeDowngradeReplicaState`. -- This is an automated message from the Apache Git
[GitHub] [kafka] dajac commented on a diff in pull request #13323: KAFKA-14617; Add ReplicaState to FetchRequest.
dajac commented on code in PR #13323: URL: https://github.com/apache/kafka/pull/13323#discussion_r1123094538 ## raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java: ## @@ -936,13 +937,13 @@ private CompletableFuture handleFetchRequest( RaftRequest.Inbound requestMetadata, long currentTimeMs ) { -FetchRequestData request = (FetchRequestData) requestMetadata.data; +FetchRequest request = new FetchRequest((FetchRequestData) requestMetadata.data, requestMetadata.apiVersion); Review Comment: I am not sure to understand why we need this change. Could you elaborate? ## raft/src/main/java/org/apache/kafka/raft/RaftRequest.java: ## @@ -45,11 +45,14 @@ public long createdTimeMs() { return createdTimeMs; } + public static class Inbound extends RaftRequest { public final CompletableFuture completion = new CompletableFuture<>(); +public final short apiVersion; -public Inbound(int correlationId, ApiMessage data, long createdTimeMs) { +public Inbound(int correlationId, ApiMessage data, long createdTimeMs, short apiVertion) { Review Comment: Same question here. ## raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java: ## @@ -48,6 +48,7 @@ import org.apache.kafka.common.requests.DescribeQuorumResponse; import org.apache.kafka.common.requests.EndQuorumEpochRequest; import org.apache.kafka.common.requests.EndQuorumEpochResponse; +import org.apache.kafka.common.requests.FetchRequest; Review Comment: Not related to this line. It would be better to always construct the FetchRequest with the new schema [here](https://github.com/apache/kafka/blob/a304d91d49b2ca56b51d3a9d6589ef69a6065bbb/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java#L1791) and to downgrade in the builder later on. ## clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java: ## @@ -302,6 +315,18 @@ public String toString() { } } +public static void updateFetchRequestDataReplicaState(FetchRequestData fetchRequestData, int replicaId, long replicaEpoch, short version) { +if (version < 15) { +fetchRequestData.setReplicaId(replicaId); +fetchRequestData.setReplicaState(new ReplicaState()); +} else { +fetchRequestData.setReplicaId(new FetchRequestData().replicaId()); Review Comment: nit: Allocating `FetchRequestData` to get `-1` is a bit wasteful here. Could we just use `-1`? ## storage/src/main/java/org/apache/kafka/storage/internals/log/FetchParams.java: ## @@ -25,6 +25,7 @@ public class FetchParams { public final short requestVersion; public final int replicaId; +public final long brokerEpoch; Review Comment: nit: replicaEpoch? ## clients/src/main/resources/common/message/FetchRequest.json: ## @@ -50,14 +50,22 @@ // Version 13 replaces topic names with topic IDs (KIP-516). May return UNKNOWN_TOPIC_ID error code. // // Version 14 is the same as version 13 but it also receives a new error called OffsetMovedToTieredStorageException(KIP-405) - "validVersions": "0-14", + // + // Version 15 adds the ReplicaState which includes new field ReplicaEpoch and the ReplicaId (KIP-903) + "validVersions": "0-15", "flexibleVersions": "12+", "fields": [ { "name": "ClusterId", "type": "string", "versions": "12+", "nullableVersions": "12+", "default": "null", "taggedVersions": "12+", "tag": 0, "ignorable": true, "about": "The clusterId if known. This is used to validate metadata fetches prior to broker registration." }, -{ "name": "ReplicaId", "type": "int32", "versions": "0+", "entityType": "brokerId", +{ "name": "ReplicaId", "type": "int32", "versions": "0-14", "entityType": "brokerId", "about": "The broker ID of the follower, of -1 if this request is from a consumer." }, +{ "name": "ReplicaState", "type": "ReplicaState", "taggedVersions":"15+", "tag": 1, "fields": [ + { "name": "ReplicaId", "type": "int32", "versions": "15+", "default": "-1", "entityType": "brokerId", +"about": "The replica ID of the follower, of -1 if this request is from a consumer." }, Review Comment: nit: `of` -> `or`? ## storage/src/main/java/org/apache/kafka/storage/internals/log/FetchParams.java: ## @@ -42,6 +44,7 @@ public FetchParams(short requestVersion, Objects.requireNonNull(clientMetadata); this.requestVersion = requestVersion; this.replicaId = replicaId; +this.brokerEpoch = brokerEpoch; Review Comment: We need to update `hashCode`, `equals` and `toString` as well. ## core/src/test/scala/unit/kafka/cluster/PartitionTest.scala: ## @@ -110,11 +110,13 @@ object PartitionTest { replicaId: Int, maxWaitMs: Long = 0L, minBytes: Int = 1, -maxBytes: Int =