dajac commented on code in PR #13323: URL: https://github.com/apache/kafka/pull/13323#discussion_r1123094538
########## raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java: ########## @@ -936,13 +937,13 @@ private CompletableFuture<FetchResponseData> handleFetchRequest( RaftRequest.Inbound requestMetadata, long currentTimeMs ) { - FetchRequestData request = (FetchRequestData) requestMetadata.data; + FetchRequest request = new FetchRequest((FetchRequestData) requestMetadata.data, requestMetadata.apiVersion); Review Comment: I am not sure to understand why we need this change. Could you elaborate? ########## raft/src/main/java/org/apache/kafka/raft/RaftRequest.java: ########## @@ -45,11 +45,14 @@ public long createdTimeMs() { return createdTimeMs; } + public static class Inbound extends RaftRequest { public final CompletableFuture<RaftResponse.Outbound> completion = new CompletableFuture<>(); + public final short apiVersion; - public Inbound(int correlationId, ApiMessage data, long createdTimeMs) { + public Inbound(int correlationId, ApiMessage data, long createdTimeMs, short apiVertion) { Review Comment: Same question here. ########## raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java: ########## @@ -48,6 +48,7 @@ import org.apache.kafka.common.requests.DescribeQuorumResponse; import org.apache.kafka.common.requests.EndQuorumEpochRequest; import org.apache.kafka.common.requests.EndQuorumEpochResponse; +import org.apache.kafka.common.requests.FetchRequest; Review Comment: Not related to this line. It would be better to always construct the FetchRequest with the new schema [here](https://github.com/apache/kafka/blob/a304d91d49b2ca56b51d3a9d6589ef69a6065bbb/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java#L1791) and to downgrade in the builder later on. ########## clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java: ########## @@ -302,6 +315,18 @@ public String toString() { } } + public static void updateFetchRequestDataReplicaState(FetchRequestData fetchRequestData, int replicaId, long replicaEpoch, short version) { + if (version < 15) { + fetchRequestData.setReplicaId(replicaId); + fetchRequestData.setReplicaState(new ReplicaState()); + } else { + fetchRequestData.setReplicaId(new FetchRequestData().replicaId()); Review Comment: nit: Allocating `FetchRequestData` to get `-1` is a bit wasteful here. Could we just use `-1`? ########## storage/src/main/java/org/apache/kafka/storage/internals/log/FetchParams.java: ########## @@ -25,6 +25,7 @@ public class FetchParams { public final short requestVersion; public final int replicaId; + public final long brokerEpoch; Review Comment: nit: replicaEpoch? ########## clients/src/main/resources/common/message/FetchRequest.json: ########## @@ -50,14 +50,22 @@ // Version 13 replaces topic names with topic IDs (KIP-516). May return UNKNOWN_TOPIC_ID error code. // // Version 14 is the same as version 13 but it also receives a new error called OffsetMovedToTieredStorageException(KIP-405) - "validVersions": "0-14", + // + // Version 15 adds the ReplicaState which includes new field ReplicaEpoch and the ReplicaId (KIP-903) + "validVersions": "0-15", "flexibleVersions": "12+", "fields": [ { "name": "ClusterId", "type": "string", "versions": "12+", "nullableVersions": "12+", "default": "null", "taggedVersions": "12+", "tag": 0, "ignorable": true, "about": "The clusterId if known. This is used to validate metadata fetches prior to broker registration." }, - { "name": "ReplicaId", "type": "int32", "versions": "0+", "entityType": "brokerId", + { "name": "ReplicaId", "type": "int32", "versions": "0-14", "entityType": "brokerId", "about": "The broker ID of the follower, of -1 if this request is from a consumer." }, + { "name": "ReplicaState", "type": "ReplicaState", "taggedVersions":"15+", "tag": 1, "fields": [ + { "name": "ReplicaId", "type": "int32", "versions": "15+", "default": "-1", "entityType": "brokerId", + "about": "The replica ID of the follower, of -1 if this request is from a consumer." }, Review Comment: nit: `of` -> `or`? ########## storage/src/main/java/org/apache/kafka/storage/internals/log/FetchParams.java: ########## @@ -42,6 +44,7 @@ public FetchParams(short requestVersion, Objects.requireNonNull(clientMetadata); this.requestVersion = requestVersion; this.replicaId = replicaId; + this.brokerEpoch = brokerEpoch; Review Comment: We need to update `hashCode`, `equals` and `toString` as well. ########## core/src/test/scala/unit/kafka/cluster/PartitionTest.scala: ########## @@ -110,11 +110,13 @@ object PartitionTest { replicaId: Int, maxWaitMs: Long = 0L, minBytes: Int = 1, - maxBytes: Int = Int.MaxValue + maxBytes: Int = Int.MaxValue, + brokerEpoch: Long = 1L Review Comment: nit: Could we put `brokerEpoch` after `replicaId`? ########## clients/src/main/resources/common/message/FetchResponse.json: ########## @@ -43,7 +43,9 @@ // Version 13 replaces the topic name field with topic ID (KIP-516). // // Version 14 is the same as version 13 but it also receives a new error called OffsetMovedToTieredStorageException (KIP-405) - "validVersions": "0-14", + // + // Version 15 adds the ReplicaEpoch. No change for response(KIP-903) Review Comment: nit: `Version 15 is the same as version 14.`? ########## clients/src/main/resources/common/message/FetchRequest.json: ########## @@ -50,14 +50,22 @@ // Version 13 replaces topic names with topic IDs (KIP-516). May return UNKNOWN_TOPIC_ID error code. // // Version 14 is the same as version 13 but it also receives a new error called OffsetMovedToTieredStorageException(KIP-405) - "validVersions": "0-14", + // + // Version 15 adds the ReplicaState which includes new field ReplicaEpoch and the ReplicaId (KIP-903) + "validVersions": "0-15", "flexibleVersions": "12+", "fields": [ { "name": "ClusterId", "type": "string", "versions": "12+", "nullableVersions": "12+", "default": "null", "taggedVersions": "12+", "tag": 0, "ignorable": true, "about": "The clusterId if known. This is used to validate metadata fetches prior to broker registration." }, - { "name": "ReplicaId", "type": "int32", "versions": "0+", "entityType": "brokerId", + { "name": "ReplicaId", "type": "int32", "versions": "0-14", "entityType": "brokerId", "about": "The broker ID of the follower, of -1 if this request is from a consumer." }, + { "name": "ReplicaState", "type": "ReplicaState", "taggedVersions":"15+", "tag": 1, "fields": [ + { "name": "ReplicaId", "type": "int32", "versions": "15+", "default": "-1", "entityType": "brokerId", + "about": "The replica ID of the follower, of -1 if this request is from a consumer." }, + { "name": "ReplicaEpoch", "type": "int64", "versions": "15+", "default": "-1", + "about": "The epoch of this follower." } Review Comment: nit: `..., or -1 if not available`? ########## clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java: ########## @@ -337,8 +362,27 @@ public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) { .setResponses(topicResponseList)); } + public String clusterId() { + return data.clusterId(); + } + + public List<FetchTopic> topics() { + return data.topics(); + } + + public int maxWaitMs() { + return data.maxWaitMs(); + } Review Comment: Do we use those ones anywhere? ########## server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java: ########## @@ -168,7 +168,10 @@ public enum MetadataVersion { IBP_3_4_IV0(8, "3.4", "IV0", true), // Support for tiered storage (KIP-405) - IBP_3_5_IV0(9, "3.5", "IV0", false); + IBP_3_5_IV0(9, "3.5", "IV0", false), + + // Support for including follower broker epoch in Fetch request (KIP-903). Review Comment: nit: Adds replica epoch to Fetch request (KIP-903)? ########## server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java: ########## @@ -168,7 +168,10 @@ public enum MetadataVersion { IBP_3_4_IV0(8, "3.4", "IV0", true), // Support for tiered storage (KIP-405) - IBP_3_5_IV0(9, "3.5", "IV0", false); + IBP_3_5_IV0(9, "3.5", "IV0", false), + + // Support for including follower broker epoch in Fetch request (KIP-903). + IBP_3_5_IV1(10, "3.5", "IV1", true); Review Comment: Why `true` here? ########## server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java: ########## @@ -168,7 +168,10 @@ public enum MetadataVersion { IBP_3_4_IV0(8, "3.4", "IV0", true), // Support for tiered storage (KIP-405) - IBP_3_5_IV0(9, "3.5", "IV0", false); + IBP_3_5_IV0(9, "3.5", "IV0", false), + + // Support for including follower broker epoch in Fetch request (KIP-903). + IBP_3_5_IV1(10, "3.5", "IV1", true); Review Comment: Why `true` here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org