dajac commented on code in PR #13323:
URL: https://github.com/apache/kafka/pull/13323#discussion_r1123094538


##########
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##########
@@ -936,13 +937,13 @@ private CompletableFuture<FetchResponseData> 
handleFetchRequest(
         RaftRequest.Inbound requestMetadata,
         long currentTimeMs
     ) {
-        FetchRequestData request = (FetchRequestData) requestMetadata.data;
+        FetchRequest request = new FetchRequest((FetchRequestData) 
requestMetadata.data, requestMetadata.apiVersion);

Review Comment:
   I am not sure to understand why we need this change. Could you elaborate?



##########
raft/src/main/java/org/apache/kafka/raft/RaftRequest.java:
##########
@@ -45,11 +45,14 @@ public long createdTimeMs() {
         return createdTimeMs;
     }
 
+
     public static class Inbound extends RaftRequest {
         public final CompletableFuture<RaftResponse.Outbound> completion = new 
CompletableFuture<>();
+        public final short apiVersion;
 
-        public Inbound(int correlationId, ApiMessage data, long createdTimeMs) 
{
+        public Inbound(int correlationId, ApiMessage data, long createdTimeMs, 
short apiVertion) {

Review Comment:
   Same question here.



##########
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##########
@@ -48,6 +48,7 @@
 import org.apache.kafka.common.requests.DescribeQuorumResponse;
 import org.apache.kafka.common.requests.EndQuorumEpochRequest;
 import org.apache.kafka.common.requests.EndQuorumEpochResponse;
+import org.apache.kafka.common.requests.FetchRequest;

Review Comment:
   Not related to this line. It would be better to always construct the 
FetchRequest with the new schema 
[here](https://github.com/apache/kafka/blob/a304d91d49b2ca56b51d3a9d6589ef69a6065bbb/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java#L1791)
 and to downgrade in the builder later on.



##########
clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java:
##########
@@ -302,6 +315,18 @@ public String toString() {
         }
     }
 
+    public static void updateFetchRequestDataReplicaState(FetchRequestData 
fetchRequestData, int replicaId, long replicaEpoch, short version) {
+        if (version < 15) {
+            fetchRequestData.setReplicaId(replicaId);
+            fetchRequestData.setReplicaState(new ReplicaState());
+        } else {
+            fetchRequestData.setReplicaId(new FetchRequestData().replicaId());

Review Comment:
   nit: Allocating `FetchRequestData` to get `-1` is a bit wasteful here. Could 
we just use `-1`?



##########
storage/src/main/java/org/apache/kafka/storage/internals/log/FetchParams.java:
##########
@@ -25,6 +25,7 @@
 public class FetchParams {
     public final short requestVersion;
     public final int replicaId;
+    public final long brokerEpoch;

Review Comment:
   nit: replicaEpoch?



##########
clients/src/main/resources/common/message/FetchRequest.json:
##########
@@ -50,14 +50,22 @@
   // Version 13 replaces topic names with topic IDs (KIP-516). May return 
UNKNOWN_TOPIC_ID error code.
   //
   // Version 14 is the same as version 13 but it also receives a new error 
called OffsetMovedToTieredStorageException(KIP-405)
-  "validVersions": "0-14",
+  //
+  // Version 15 adds the ReplicaState which includes new field ReplicaEpoch 
and the ReplicaId (KIP-903)
+  "validVersions": "0-15",
   "flexibleVersions": "12+",
   "fields": [
     { "name": "ClusterId", "type": "string", "versions": "12+", 
"nullableVersions": "12+", "default": "null",
       "taggedVersions": "12+", "tag": 0, "ignorable": true,
       "about": "The clusterId if known. This is used to validate metadata 
fetches prior to broker registration." },
-    { "name": "ReplicaId", "type": "int32", "versions": "0+", "entityType": 
"brokerId",
+    { "name": "ReplicaId", "type": "int32", "versions": "0-14", "entityType": 
"brokerId",
       "about": "The broker ID of the follower, of -1 if this request is from a 
consumer." },
+    { "name": "ReplicaState", "type": "ReplicaState", "taggedVersions":"15+", 
"tag": 1, "fields": [
+      { "name": "ReplicaId", "type": "int32", "versions": "15+", "default": 
"-1", "entityType": "brokerId",
+        "about": "The replica ID of the follower, of -1 if this request is 
from a consumer." },

Review Comment:
   nit: `of` -> `or`?



##########
storage/src/main/java/org/apache/kafka/storage/internals/log/FetchParams.java:
##########
@@ -42,6 +44,7 @@ public FetchParams(short requestVersion,
         Objects.requireNonNull(clientMetadata);
         this.requestVersion = requestVersion;
         this.replicaId = replicaId;
+        this.brokerEpoch = brokerEpoch;

Review Comment:
   We need to update `hashCode`, `equals` and `toString` as well.



##########
core/src/test/scala/unit/kafka/cluster/PartitionTest.scala:
##########
@@ -110,11 +110,13 @@ object PartitionTest {
     replicaId: Int,
     maxWaitMs: Long = 0L,
     minBytes: Int = 1,
-    maxBytes: Int = Int.MaxValue
+    maxBytes: Int = Int.MaxValue,
+    brokerEpoch: Long = 1L

Review Comment:
   nit: Could we put `brokerEpoch` after `replicaId`?



##########
clients/src/main/resources/common/message/FetchResponse.json:
##########
@@ -43,7 +43,9 @@
   // Version 13 replaces the topic name field with topic ID (KIP-516).
   //
   // Version 14 is the same as version 13 but it also receives a new error 
called OffsetMovedToTieredStorageException (KIP-405)
-  "validVersions": "0-14",
+  //
+  // Version 15 adds the ReplicaEpoch. No change for response(KIP-903)

Review Comment:
   nit: `Version 15 is the same as version 14.`?



##########
clients/src/main/resources/common/message/FetchRequest.json:
##########
@@ -50,14 +50,22 @@
   // Version 13 replaces topic names with topic IDs (KIP-516). May return 
UNKNOWN_TOPIC_ID error code.
   //
   // Version 14 is the same as version 13 but it also receives a new error 
called OffsetMovedToTieredStorageException(KIP-405)
-  "validVersions": "0-14",
+  //
+  // Version 15 adds the ReplicaState which includes new field ReplicaEpoch 
and the ReplicaId (KIP-903)
+  "validVersions": "0-15",
   "flexibleVersions": "12+",
   "fields": [
     { "name": "ClusterId", "type": "string", "versions": "12+", 
"nullableVersions": "12+", "default": "null",
       "taggedVersions": "12+", "tag": 0, "ignorable": true,
       "about": "The clusterId if known. This is used to validate metadata 
fetches prior to broker registration." },
-    { "name": "ReplicaId", "type": "int32", "versions": "0+", "entityType": 
"brokerId",
+    { "name": "ReplicaId", "type": "int32", "versions": "0-14", "entityType": 
"brokerId",
       "about": "The broker ID of the follower, of -1 if this request is from a 
consumer." },
+    { "name": "ReplicaState", "type": "ReplicaState", "taggedVersions":"15+", 
"tag": 1, "fields": [
+      { "name": "ReplicaId", "type": "int32", "versions": "15+", "default": 
"-1", "entityType": "brokerId",
+        "about": "The replica ID of the follower, of -1 if this request is 
from a consumer." },
+      { "name": "ReplicaEpoch", "type": "int64", "versions": "15+", "default": 
"-1",
+        "about": "The epoch of this follower." }

Review Comment:
   nit: `..., or -1 if not available`?



##########
clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java:
##########
@@ -337,8 +362,27 @@ public AbstractResponse getErrorResponse(int 
throttleTimeMs, Throwable e) {
                 .setResponses(topicResponseList));
     }
 
+    public String clusterId() {
+        return data.clusterId();
+    }
+
+    public List<FetchTopic> topics() {
+        return data.topics();
+    }
+
+    public int maxWaitMs() {
+        return data.maxWaitMs();
+    }

Review Comment:
   Do we use those ones anywhere?



##########
server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java:
##########
@@ -168,7 +168,10 @@ public enum MetadataVersion {
     IBP_3_4_IV0(8, "3.4", "IV0", true),
 
     // Support for tiered storage (KIP-405)
-    IBP_3_5_IV0(9, "3.5", "IV0", false);
+    IBP_3_5_IV0(9, "3.5", "IV0", false),
+
+    // Support for including follower broker epoch in Fetch request (KIP-903).

Review Comment:
   nit: Adds replica epoch to Fetch request (KIP-903)?



##########
server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java:
##########
@@ -168,7 +168,10 @@ public enum MetadataVersion {
     IBP_3_4_IV0(8, "3.4", "IV0", true),
 
     // Support for tiered storage (KIP-405)
-    IBP_3_5_IV0(9, "3.5", "IV0", false);
+    IBP_3_5_IV0(9, "3.5", "IV0", false),
+
+    // Support for including follower broker epoch in Fetch request (KIP-903).
+    IBP_3_5_IV1(10, "3.5", "IV1", true);

Review Comment:
   Why `true` here? 



##########
server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java:
##########
@@ -168,7 +168,10 @@ public enum MetadataVersion {
     IBP_3_4_IV0(8, "3.4", "IV0", true),
 
     // Support for tiered storage (KIP-405)
-    IBP_3_5_IV0(9, "3.5", "IV0", false);
+    IBP_3_5_IV0(9, "3.5", "IV0", false),
+
+    // Support for including follower broker epoch in Fetch request (KIP-903).
+    IBP_3_5_IV1(10, "3.5", "IV1", true);

Review Comment:
   Why `true` here? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to