kirktrue commented on code in PR #14444:
URL: https://github.com/apache/kafka/pull/14444#discussion_r1338814653


##########
clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java:
##########
@@ -254,8 +265,10 @@ private static boolean 
matchingTopic(FetchResponseData.FetchableTopicResponse pr
     private static FetchResponseData toMessage(Errors error,
                                                int throttleTimeMs,
                                                int sessionId,
-                                               
Iterator<Map.Entry<TopicIdPartition, FetchResponseData.PartitionData>> 
partIterator) {
+                                               
Iterator<Map.Entry<TopicIdPartition, FetchResponseData.PartitionData>> 
partIterator,
+                                               List<Node> nodeEndpoints) {
         List<FetchResponseData.FetchableTopicResponse> topicResponseList = new 
ArrayList<>();
+        FetchResponseData data = new FetchResponseData();

Review Comment:
   nit: can we move the object creation closer to where it's updated and 
returned at the bottom of the method?



##########
clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java:
##########
@@ -67,20 +69,31 @@ public ProduceResponse(ProduceResponseData 
produceResponseData) {
      */
     @Deprecated
     public ProduceResponse(Map<TopicPartition, PartitionResponse> responses) {
-        this(responses, DEFAULT_THROTTLE_TIME);
+        this(responses, DEFAULT_THROTTLE_TIME, Collections.emptyList());
     }
 
     /**
-     * Constructor for the latest version
+     * Constructor for versions <= 9
      * @param responses Produced data grouped by topic-partition
      * @param throttleTimeMs Time in milliseconds the response was throttled
      */
     @Deprecated
     public ProduceResponse(Map<TopicPartition, PartitionResponse> responses, 
int throttleTimeMs) {
-        this(toData(responses, throttleTimeMs));
+        this(toData(responses, throttleTimeMs, Collections.emptyList()));
+    }
+
+    /**
+     * Constructor for the latest version
+     * @param responses Produced data grouped by topic-partition
+     * @param throttleTimeMs Time in milliseconds the response was throttled
+     * @param nodeEndpoints List of node endpoints
+     */
+    @Deprecated

Review Comment:
   I'm confused—why is the new constructor is marked as `@Deprecated`? If 
that's intentional, can you add a comment about what should be used instead? 
Thanks.



##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -559,6 +560,30 @@ class KafkaApis(val requestChannel: RequestChannel,
     }
   }
 
+  case class LeaderNode(leaderId: Int, leaderEpoch: Int, node: Node)
+
+  private def getCurrentLeader(tp: TopicPartition): LeaderNode = {
+    val partitionInfoOrError = replicaManager.getPartitionOrError(tp)
+    var leaderId = -1
+    var leaderEpoch = -1
+    partitionInfoOrError match {
+      case Right(x) =>
+          leaderId = x.leaderReplicaIdOpt.getOrElse(-1)
+          leaderEpoch = x.getLeaderEpoch
+      case Left(x) =>
+        debug(s"Unable to retrieve local leaderId and Epoch with error $x, 
falling back to metadata cache")
+        val partitionInfo = metadataCache.getPartitionInfo(tp.topic, 
tp.partition)
+        partitionInfo.foreach { info =>

Review Comment:
   This `foreach` loop is overwriting the `leaderId` and `leaderEpoch` each 
time. Is that intentional? Is there a benefit to looping vs. just grabbing the 
last entry in the collection?



##########
clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java:
##########
@@ -210,6 +238,12 @@ public String toString() {
             b.append(logStartOffset);
             b.append(", recordErrors: ");
             b.append(recordErrors);
+            b.append(", currentLeader: ");
+            if (currentLeader != null) {

Review Comment:
   In fact, the `errorMessage` bit could be redone that way too.



##########
clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java:
##########
@@ -210,6 +238,12 @@ public String toString() {
             b.append(logStartOffset);
             b.append(", recordErrors: ");
             b.append(recordErrors);
+            b.append(", currentLeader: ");
+            if (currentLeader != null) {

Review Comment:
   In fact, I think that the `StringBuilder` code checks for `null`s in its 
`append()` method.



##########
clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java:
##########
@@ -210,6 +238,12 @@ public String toString() {
             b.append(logStartOffset);
             b.append(", recordErrors: ");
             b.append(recordErrors);
+            b.append(", currentLeader: ");
+            if (currentLeader != null) {

Review Comment:
   I _think_ that the following lines could be simply 
`String.valueOf(currentLeader)`, right? 



##########
clients/src/main/resources/common/message/ProduceRequest.json:
##########
@@ -33,7 +33,7 @@
   // Starting in Version 8, response has RecordErrors and ErrorMessage. See 
KIP-467.
   //
   // Version 9 enables flexible versions.
-  "validVersions": "0-9",
+  "validVersions": "0-10",

Review Comment:
   Does it make sense to add a comment about the version bump?



##########
clients/src/main/resources/common/message/FetchRequest.json:
##########
@@ -53,7 +53,9 @@
   //
   // Version 15 adds the ReplicaState which includes new field ReplicaEpoch 
and the ReplicaId. Also,
   // deprecate the old ReplicaId field and set its default value to -1. 
(KIP-903)
-  "validVersions": "0-15",
+  //
+  // Version 16 is the same as version 15.

Review Comment:
   At the risk of proving my ignorance, why do we bump the version number if 
nothing has changed? Is it so that the request the same version number as the 
response (which is bumped)?



##########
clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java:
##########
@@ -67,20 +69,31 @@ public ProduceResponse(ProduceResponseData 
produceResponseData) {
      */
     @Deprecated
     public ProduceResponse(Map<TopicPartition, PartitionResponse> responses) {
-        this(responses, DEFAULT_THROTTLE_TIME);
+        this(responses, DEFAULT_THROTTLE_TIME, Collections.emptyList());
     }
 
     /**
-     * Constructor for the latest version
+     * Constructor for versions <= 9
      * @param responses Produced data grouped by topic-partition
      * @param throttleTimeMs Time in milliseconds the response was throttled
      */
     @Deprecated
     public ProduceResponse(Map<TopicPartition, PartitionResponse> responses, 
int throttleTimeMs) {
-        this(toData(responses, throttleTimeMs));
+        this(toData(responses, throttleTimeMs, Collections.emptyList()));
+    }
+
+    /**
+     * Constructor for the latest version
+     * @param responses Produced data grouped by topic-partition
+     * @param throttleTimeMs Time in milliseconds the response was throttled
+     * @param nodeEndpoints List of node endpoints
+     */
+    @Deprecated

Review Comment:
   I'm confused—why is the new constructor is marked as `@Deprecated`? If 
that's intentional, can you add a comment about what should be used instead? 
Thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to