kirktrue commented on code in PR #14444: URL: https://github.com/apache/kafka/pull/14444#discussion_r1338814653
########## clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java: ########## @@ -254,8 +265,10 @@ private static boolean matchingTopic(FetchResponseData.FetchableTopicResponse pr private static FetchResponseData toMessage(Errors error, int throttleTimeMs, int sessionId, - Iterator<Map.Entry<TopicIdPartition, FetchResponseData.PartitionData>> partIterator) { + Iterator<Map.Entry<TopicIdPartition, FetchResponseData.PartitionData>> partIterator, + List<Node> nodeEndpoints) { List<FetchResponseData.FetchableTopicResponse> topicResponseList = new ArrayList<>(); + FetchResponseData data = new FetchResponseData(); Review Comment: nit: can we move the object creation closer to where it's updated and returned at the bottom of the method? ########## clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java: ########## @@ -67,20 +69,31 @@ public ProduceResponse(ProduceResponseData produceResponseData) { */ @Deprecated public ProduceResponse(Map<TopicPartition, PartitionResponse> responses) { - this(responses, DEFAULT_THROTTLE_TIME); + this(responses, DEFAULT_THROTTLE_TIME, Collections.emptyList()); } /** - * Constructor for the latest version + * Constructor for versions <= 9 * @param responses Produced data grouped by topic-partition * @param throttleTimeMs Time in milliseconds the response was throttled */ @Deprecated public ProduceResponse(Map<TopicPartition, PartitionResponse> responses, int throttleTimeMs) { - this(toData(responses, throttleTimeMs)); + this(toData(responses, throttleTimeMs, Collections.emptyList())); + } + + /** + * Constructor for the latest version + * @param responses Produced data grouped by topic-partition + * @param throttleTimeMs Time in milliseconds the response was throttled + * @param nodeEndpoints List of node endpoints + */ + @Deprecated Review Comment: I'm confused—why is the new constructor is marked as `@Deprecated`? If that's intentional, can you add a comment about what should be used instead? Thanks. ########## core/src/main/scala/kafka/server/KafkaApis.scala: ########## @@ -559,6 +560,30 @@ class KafkaApis(val requestChannel: RequestChannel, } } + case class LeaderNode(leaderId: Int, leaderEpoch: Int, node: Node) + + private def getCurrentLeader(tp: TopicPartition): LeaderNode = { + val partitionInfoOrError = replicaManager.getPartitionOrError(tp) + var leaderId = -1 + var leaderEpoch = -1 + partitionInfoOrError match { + case Right(x) => + leaderId = x.leaderReplicaIdOpt.getOrElse(-1) + leaderEpoch = x.getLeaderEpoch + case Left(x) => + debug(s"Unable to retrieve local leaderId and Epoch with error $x, falling back to metadata cache") + val partitionInfo = metadataCache.getPartitionInfo(tp.topic, tp.partition) + partitionInfo.foreach { info => Review Comment: This `foreach` loop is overwriting the `leaderId` and `leaderEpoch` each time. Is that intentional? Is there a benefit to looping vs. just grabbing the last entry in the collection? ########## clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java: ########## @@ -210,6 +238,12 @@ public String toString() { b.append(logStartOffset); b.append(", recordErrors: "); b.append(recordErrors); + b.append(", currentLeader: "); + if (currentLeader != null) { Review Comment: In fact, the `errorMessage` bit could be redone that way too. ########## clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java: ########## @@ -210,6 +238,12 @@ public String toString() { b.append(logStartOffset); b.append(", recordErrors: "); b.append(recordErrors); + b.append(", currentLeader: "); + if (currentLeader != null) { Review Comment: In fact, I think that the `StringBuilder` code checks for `null`s in its `append()` method. ########## clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java: ########## @@ -210,6 +238,12 @@ public String toString() { b.append(logStartOffset); b.append(", recordErrors: "); b.append(recordErrors); + b.append(", currentLeader: "); + if (currentLeader != null) { Review Comment: I _think_ that the following lines could be simply `String.valueOf(currentLeader)`, right? ########## clients/src/main/resources/common/message/ProduceRequest.json: ########## @@ -33,7 +33,7 @@ // Starting in Version 8, response has RecordErrors and ErrorMessage. See KIP-467. // // Version 9 enables flexible versions. - "validVersions": "0-9", + "validVersions": "0-10", Review Comment: Does it make sense to add a comment about the version bump? ########## clients/src/main/resources/common/message/FetchRequest.json: ########## @@ -53,7 +53,9 @@ // // Version 15 adds the ReplicaState which includes new field ReplicaEpoch and the ReplicaId. Also, // deprecate the old ReplicaId field and set its default value to -1. (KIP-903) - "validVersions": "0-15", + // + // Version 16 is the same as version 15. Review Comment: At the risk of proving my ignorance, why do we bump the version number if nothing has changed? Is it so that the request the same version number as the response (which is bumped)? ########## clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java: ########## @@ -67,20 +69,31 @@ public ProduceResponse(ProduceResponseData produceResponseData) { */ @Deprecated public ProduceResponse(Map<TopicPartition, PartitionResponse> responses) { - this(responses, DEFAULT_THROTTLE_TIME); + this(responses, DEFAULT_THROTTLE_TIME, Collections.emptyList()); } /** - * Constructor for the latest version + * Constructor for versions <= 9 * @param responses Produced data grouped by topic-partition * @param throttleTimeMs Time in milliseconds the response was throttled */ @Deprecated public ProduceResponse(Map<TopicPartition, PartitionResponse> responses, int throttleTimeMs) { - this(toData(responses, throttleTimeMs)); + this(toData(responses, throttleTimeMs, Collections.emptyList())); + } + + /** + * Constructor for the latest version + * @param responses Produced data grouped by topic-partition + * @param throttleTimeMs Time in milliseconds the response was throttled + * @param nodeEndpoints List of node endpoints + */ + @Deprecated Review Comment: I'm confused—why is the new constructor is marked as `@Deprecated`? If that's intentional, can you add a comment about what should be used instead? Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org