Re: [PR] KIP-951: Server side and protocol changes for KIP-951 [kafka]

2023-10-20 Thread via GitHub


chb2ab commented on code in PR #1:
URL: https://github.com/apache/kafka/pull/1#discussion_r1367126017


##
server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java:
##
@@ -338,7 +338,7 @@ public boolean isControllerRegistrationSupported() {
 
 public short fetchRequestVersion() {
 if (this.isAtLeast(IBP_3_5_IV1)) {
-return 15;
+return 16;

Review Comment:
   > > Since we are using MV/IBP bumps for fetches, a simple thing to do would 
be to pick up the newest MV for this release and include the fetch bump here
   
   > ok, so my understanding is instead of bumping IBP to IBP_3_7_IV1 we would 
wait for the release of IBP_3_8_IV0 to bump the fetch version here to 16.
   
   I should have made this a question. I confused myself with the release 
versions, I think we can use IBP_3_7_IV0 to bump the fetch version to 16 since 
it hasn't been released yet, let me know if it makes sense to you.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KIP-951: Server side and protocol changes for KIP-951 [kafka]

2023-10-20 Thread via GitHub


chb2ab commented on code in PR #1:
URL: https://github.com/apache/kafka/pull/1#discussion_r1367119960


##
clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java:
##
@@ -98,13 +116,20 @@ private static ProduceResponseData 
toData(Map
 .setLogAppendTimeMs(response.logAppendTime)
 .setErrorMessage(response.errorMessage)
 .setErrorCode(response.error.code())
+.setCurrentLeader(response.currentLeader != null ? 
response.currentLeader : new LeaderIdAndEpoch())

Review Comment:
   based on the change I made in handleProduceRequest I don't think 
currentLeader can be null anymore, I removed this check



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KIP-951: Server side and protocol changes for KIP-951 [kafka]

2023-10-20 Thread via GitHub


chb2ab commented on code in PR #1:
URL: https://github.com/apache/kafka/pull/1#discussion_r1367118129


##
clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java:
##
@@ -156,7 +182,11 @@ public PartitionResponse(Errors error, long baseOffset, 
long logAppendTime, long
 }
 
 public PartitionResponse(Errors error, long baseOffset, long 
logAppendTime, long logStartOffset, List recordErrors, String 
errorMessage) {
-this(error, baseOffset, INVALID_OFFSET, logAppendTime, 
logStartOffset, recordErrors, errorMessage);
+this(error, baseOffset, INVALID_OFFSET, logAppendTime, 
logStartOffset, recordErrors, errorMessage, new 
ProduceResponseData.LeaderIdAndEpoch());
+}
+
+public PartitionResponse(Errors error, long baseOffset, long 
lastOffset, long logAppendTime, long logStartOffset, List 
recordErrors, String errorMessage) {
+this(error, baseOffset, lastOffset, logAppendTime, logStartOffset, 
recordErrors, errorMessage, new ProduceResponseData.LeaderIdAndEpoch());

Review Comment:
   I think I addressed this, the currentLeader parameter was unused so I 
removed it and set it to be a new LeaderIdAndEpoch by default. I also removed 
the prefix.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KIP-951: Server side and protocol changes for KIP-951 [kafka]

2023-10-20 Thread via GitHub


chb2ab commented on code in PR #1:
URL: https://github.com/apache/kafka/pull/1#discussion_r1367116597


##
clients/src/main/resources/common/message/FetchResponse.json:
##
@@ -102,6 +104,15 @@
   "about": "The preferred read replica for the consumer to use on its 
next fetch request"},
 { "name": "Records", "type": "records", "versions": "0+", 
"nullableVersions": "0+", "about": "The record data."}
   ]}
+]},

Review Comment:
   it looks like tags are scoped to the list level so this isn't really the 
same tag. They also need to be contiguous within their scope so this gives an 
error if I try to tag NodeEndpoints to something other than 0.



##
clients/src/main/resources/common/message/ProduceResponse.json:
##
@@ -59,10 +61,26 @@
 "about": "The error message of the record that caused the batch to 
be dropped"}
 ]},
 { "name":  "ErrorMessage", "type": "string", "default": "null", 
"versions": "8+", "nullableVersions": "8+", "ignorable":  true,
-  "about":  "The global error message summarizing the common root 
cause of the records that caused the batch to be dropped"}
+  "about":  "The global error message summarizing the common root 
cause of the records that caused the batch to be dropped"},
+{ "name": "CurrentLeader", "type": "LeaderIdAndEpoch", "versions": 
"10+", "taggedVersions": "10+", "tag": 0, "fields": [
+  { "name": "LeaderId", "type": "int32", "versions": "10+", "default": 
"-1", "entityType": "brokerId",
+"about": "The ID of the current leader or -1 if the leader is 
unknown."},
+  { "name": "LeaderEpoch", "type": "int32", "versions": "10+", 
"default": "-1", "about": "The latest known leader epoch"}
+]}
   ]}
 ]},
 { "name": "ThrottleTimeMs", "type": "int32", "versions": "1+", 
"ignorable": true, "default": "0",
-  "about": "The duration in milliseconds for which the request was 
throttled due to a quota violation, or zero if the request did not violate any 
quota." }
+  "about": "The duration in milliseconds for which the request was 
throttled due to a quota violation, or zero if the request did not violate any 
quota." },
+{ "name": "NodeEndpoints", "type": "[]NodeEndpoint", "versions": "10+", 
"taggedVersions": "10+", "tag": 0,

Review Comment:
   same response as in FetchResponse.json



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KIP-951: Server side and protocol changes for KIP-951 [kafka]

2023-10-20 Thread via GitHub


chb2ab commented on code in PR #1:
URL: https://github.com/apache/kafka/pull/1#discussion_r1367112293


##
core/src/main/scala/kafka/server/KafkaApis.scala:
##
@@ -619,6 +641,21 @@ class KafkaApis(val requestChannel: RequestChannel,
 request.header.clientId,
 topicPartition,
 status.error.exceptionName))
+
+  if (request.header.apiVersion >= 10) {
+status.currentLeader = {
+  status.error match {
+case Errors.NOT_LEADER_OR_FOLLOWER =>
+  val leaderNode = getCurrentLeader(topicPartition, 
request.context.listenerName)
+  nodeEndpoints.put(leaderNode.node.id(), leaderNode.node)
+  new LeaderIdAndEpoch()
+.setLeaderId(leaderNode.leaderId)
+.setLeaderEpoch(leaderNode.leaderEpoch)
+case _ =>
+  null

Review Comment:
   yeah, looking again the currentLeader should already be set to the default, 
I removed the allocation



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KIP-951: Server side and protocol changes for KIP-951 [kafka]

2023-10-20 Thread via GitHub


chb2ab commented on code in PR #1:
URL: https://github.com/apache/kafka/pull/1#discussion_r1367101090


##
core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala:
##
@@ -125,7 +125,6 @@ class ReplicaFetcherThreadTest {
 val replicaManager: ReplicaManager = mock(classOf[ReplicaManager])
 
when(replicaManager.brokerTopicStats).thenReturn(mock(classOf[BrokerTopicStats]))
 
-assertEquals(ApiKeys.FETCH.latestVersion, 
config.interBrokerProtocolVersion.fetchRequestVersion())

Review Comment:
   it looks like IBP_3_7_IV0 was added already, I was on an old version. I 
bumped up the fetch version so removing this isn't necessary anymore



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KIP-951: Server side and protocol changes for KIP-951 [kafka]

2023-10-19 Thread via GitHub


jolshan commented on PR #1:
URL: https://github.com/apache/kafka/pull/1#issuecomment-1771527507

   @chb2ab is there a JIRA for this work? If not, can we create one and format 
the title as the jira title?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KIP-951: Server side and protocol changes for KIP-951 [kafka]

2023-10-19 Thread via GitHub


jolshan commented on code in PR #1:
URL: https://github.com/apache/kafka/pull/1#discussion_r1365967820


##
core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala:
##
@@ -125,7 +125,6 @@ class ReplicaFetcherThreadTest {
 val replicaManager: ReplicaManager = mock(classOf[ReplicaManager])
 
when(replicaManager.brokerTopicStats).thenReturn(mock(classOf[BrokerTopicStats]))
 
-assertEquals(ApiKeys.FETCH.latestVersion, 
config.interBrokerProtocolVersion.fetchRequestVersion())

Review Comment:
   If we do plan to address the MV on a followup, we should definitely call it 
out and file a JIRA that is a blocker for 3.7



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KIP-951: Server side and protocol changes for KIP-951 [kafka]

2023-10-19 Thread via GitHub


jolshan commented on code in PR #1:
URL: https://github.com/apache/kafka/pull/1#discussion_r1365967290


##
core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala:
##
@@ -125,7 +125,6 @@ class ReplicaFetcherThreadTest {
 val replicaManager: ReplicaManager = mock(classOf[ReplicaManager])
 
when(replicaManager.brokerTopicStats).thenReturn(mock(classOf[BrokerTopicStats]))
 
-assertEquals(ApiKeys.FETCH.latestVersion, 
config.interBrokerProtocolVersion.fetchRequestVersion())

Review Comment:
   Hmmm -- not sure if we want to remove this. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KIP-951: Server side and protocol changes for KIP-951 [kafka]

2023-10-19 Thread via GitHub


jolshan commented on code in PR #1:
URL: https://github.com/apache/kafka/pull/1#discussion_r1365966459


##
core/src/main/scala/kafka/server/KafkaApis.scala:
##
@@ -619,6 +641,21 @@ class KafkaApis(val requestChannel: RequestChannel,
 request.header.clientId,
 topicPartition,
 status.error.exceptionName))
+
+  if (request.header.apiVersion >= 10) {
+status.currentLeader = {
+  status.error match {
+case Errors.NOT_LEADER_OR_FOLLOWER =>
+  val leaderNode = getCurrentLeader(topicPartition, 
request.context.listenerName)
+  nodeEndpoints.put(leaderNode.node.id(), leaderNode.node)
+  new LeaderIdAndEpoch()
+.setLeaderId(leaderNode.leaderId)
+.setLeaderEpoch(leaderNode.leaderEpoch)
+case _ =>
+  null

Review Comment:
   could this just be the default leaderIdAndEpoch?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KIP-951: Server side and protocol changes for KIP-951 [kafka]

2023-10-19 Thread via GitHub


jolshan commented on code in PR #1:
URL: https://github.com/apache/kafka/pull/1#discussion_r1365966194


##
clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java:
##
@@ -98,13 +116,20 @@ private static ProduceResponseData 
toData(Map
 .setLogAppendTimeMs(response.logAppendTime)
 .setErrorMessage(response.errorMessage)
 .setErrorCode(response.error.code())
+.setCurrentLeader(response.currentLeader != null ? 
response.currentLeader : new LeaderIdAndEpoch())

Review Comment:
   Or alternatively pass in the default and not have to do a check here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KIP-951: Server side and protocol changes for KIP-951 [kafka]

2023-10-19 Thread via GitHub


jolshan commented on code in PR #1:
URL: https://github.com/apache/kafka/pull/1#discussion_r1365958420


##
clients/src/main/resources/common/message/FetchResponse.json:
##
@@ -102,6 +104,15 @@
   "about": "The preferred read replica for the consumer to use on its 
next fetch request"},
 { "name": "Records", "type": "records", "versions": "0+", 
"nullableVersions": "0+", "about": "The record data."}
   ]}
+]},

Review Comment:
   Should we be using the same tag here as diverging epoch?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KIP-951: Server side and protocol changes for KIP-951 [kafka]

2023-10-19 Thread via GitHub


jolshan commented on code in PR #1:
URL: https://github.com/apache/kafka/pull/1#discussion_r1365957228


##
clients/src/main/resources/common/message/ProduceResponse.json:
##
@@ -59,10 +61,26 @@
 "about": "The error message of the record that caused the batch to 
be dropped"}
 ]},
 { "name":  "ErrorMessage", "type": "string", "default": "null", 
"versions": "8+", "nullableVersions": "8+", "ignorable":  true,
-  "about":  "The global error message summarizing the common root 
cause of the records that caused the batch to be dropped"}
+  "about":  "The global error message summarizing the common root 
cause of the records that caused the batch to be dropped"},
+{ "name": "CurrentLeader", "type": "LeaderIdAndEpoch", "versions": 
"10+", "taggedVersions": "10+", "tag": 0, "fields": [
+  { "name": "LeaderId", "type": "int32", "versions": "10+", "default": 
"-1", "entityType": "brokerId",
+"about": "The ID of the current leader or -1 if the leader is 
unknown."},
+  { "name": "LeaderEpoch", "type": "int32", "versions": "10+", 
"default": "-1", "about": "The latest known leader epoch"}
+]}
   ]}
 ]},
 { "name": "ThrottleTimeMs", "type": "int32", "versions": "1+", 
"ignorable": true, "default": "0",
-  "about": "The duration in milliseconds for which the request was 
throttled due to a quota violation, or zero if the request did not violate any 
quota." }
+  "about": "The duration in milliseconds for which the request was 
throttled due to a quota violation, or zero if the request did not violate any 
quota." },
+{ "name": "NodeEndpoints", "type": "[]NodeEndpoint", "versions": "10+", 
"taggedVersions": "10+", "tag": 0,

Review Comment:
   Should we be using the same tag as the CurrentLeader field?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KIP-951: Server side and protocol changes for KIP-951 [kafka]

2023-10-19 Thread via GitHub


jolshan commented on code in PR #1:
URL: https://github.com/apache/kafka/pull/1#discussion_r1365955785


##
clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java:
##
@@ -98,13 +116,20 @@ private static ProduceResponseData 
toData(Map
 .setLogAppendTimeMs(response.logAppendTime)
 .setErrorMessage(response.errorMessage)
 .setErrorCode(response.error.code())
+.setCurrentLeader(response.currentLeader != null ? 
response.currentLeader : new LeaderIdAndEpoch())

Review Comment:
   do we need to set anything here if the response is null?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KIP-951: Server side and protocol changes for KIP-951 [kafka]

2023-10-19 Thread via GitHub


jolshan commented on code in PR #1:
URL: https://github.com/apache/kafka/pull/1#discussion_r1365955504


##
clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java:
##
@@ -156,7 +182,11 @@ public PartitionResponse(Errors error, long baseOffset, 
long logAppendTime, long
 }
 
 public PartitionResponse(Errors error, long baseOffset, long 
logAppendTime, long logStartOffset, List recordErrors, String 
errorMessage) {
-this(error, baseOffset, INVALID_OFFSET, logAppendTime, 
logStartOffset, recordErrors, errorMessage);
+this(error, baseOffset, INVALID_OFFSET, logAppendTime, 
logStartOffset, recordErrors, errorMessage, new 
ProduceResponseData.LeaderIdAndEpoch());
+}
+
+public PartitionResponse(Errors error, long baseOffset, long 
lastOffset, long logAppendTime, long logStartOffset, List 
recordErrors, String errorMessage) {
+this(error, baseOffset, lastOffset, logAppendTime, logStartOffset, 
recordErrors, errorMessage, new ProduceResponseData.LeaderIdAndEpoch());

Review Comment:
   Or even better, can we just leave empty and use a default? Or does that 
bloat the constructors more?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KIP-951: Server side and protocol changes for KIP-951 [kafka]

2023-10-19 Thread via GitHub


jolshan commented on code in PR #1:
URL: https://github.com/apache/kafka/pull/1#discussion_r1365954460


##
clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java:
##
@@ -156,7 +182,11 @@ public PartitionResponse(Errors error, long baseOffset, 
long logAppendTime, long
 }
 
 public PartitionResponse(Errors error, long baseOffset, long 
logAppendTime, long logStartOffset, List recordErrors, String 
errorMessage) {
-this(error, baseOffset, INVALID_OFFSET, logAppendTime, 
logStartOffset, recordErrors, errorMessage);
+this(error, baseOffset, INVALID_OFFSET, logAppendTime, 
logStartOffset, recordErrors, errorMessage, new 
ProduceResponseData.LeaderIdAndEpoch());
+}
+
+public PartitionResponse(Errors error, long baseOffset, long 
lastOffset, long logAppendTime, long logStartOffset, List 
recordErrors, String errorMessage) {
+this(error, baseOffset, lastOffset, logAppendTime, logStartOffset, 
recordErrors, errorMessage, new ProduceResponseData.LeaderIdAndEpoch());

Review Comment:
   can we remove the ProduceResponseData prefixes here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KIP-951: Server side and protocol changes for KIP-951 [kafka]

2023-10-18 Thread via GitHub


chb2ab commented on PR #1:
URL: https://github.com/apache/kafka/pull/1#issuecomment-1768509901

   I reran all the failing tests locally and they passed, I'm not sure if 
there's anything else that needs to be done but they seem like flaky tests.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KIP-951: Server side and protocol changes for KIP-951 [kafka]

2023-10-17 Thread via GitHub


chb2ab commented on code in PR #1:
URL: https://github.com/apache/kafka/pull/1#discussion_r1362471338


##
core/src/main/scala/kafka/server/KafkaApis.scala:
##
@@ -559,6 +560,26 @@ class KafkaApis(val requestChannel: RequestChannel,
 }
   }
 
+  case class LeaderNode(leaderId: Int, leaderEpoch: Int, node: Node)
+
+  private def getCurrentLeader(tp: TopicPartition): LeaderNode = {
+val partitionInfoOrError = replicaManager.getPartitionOrError(tp)
+val (leaderId, leaderEpoch) = partitionInfoOrError match {
+  case Right(x) =>
+(x.leaderReplicaIdOpt.getOrElse(-1), x.getLeaderEpoch)
+  case Left(x) =>
+debug(s"Unable to retrieve local leaderId and Epoch with error $x, 
falling back to metadata cache")
+metadataCache.getPartitionInfo(tp.topic, tp.partition) match {
+  case Some(pinfo) => (pinfo.leader(), pinfo.leaderEpoch())
+  case None => (-1, -1)
+}
+}
+val leaderNode: Node = metadataCache.getAliveBrokerNode(leaderId, 
config.interBrokerListenerName).getOrElse({

Review Comment:
   ok, so my understanding is in the case of partition reassignments it would 
be better to go directly to metadata cache, but when moving leadership within 
the replica set it is better to go to replica manager first. I think we should 
prioritize moving leadership within the replica set here since it seems more 
common, what do you all think?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KIP-951: Server side and protocol changes for KIP-951 [kafka]

2023-10-17 Thread via GitHub


jolshan commented on code in PR #1:
URL: https://github.com/apache/kafka/pull/1#discussion_r1362384730


##
core/src/main/scala/kafka/server/KafkaApis.scala:
##
@@ -559,6 +560,26 @@ class KafkaApis(val requestChannel: RequestChannel,
 }
   }
 
+  case class LeaderNode(leaderId: Int, leaderEpoch: Int, node: Node)
+
+  private def getCurrentLeader(tp: TopicPartition): LeaderNode = {
+val partitionInfoOrError = replicaManager.getPartitionOrError(tp)
+val (leaderId, leaderEpoch) = partitionInfoOrError match {
+  case Right(x) =>
+(x.leaderReplicaIdOpt.getOrElse(-1), x.getLeaderEpoch)
+  case Left(x) =>
+debug(s"Unable to retrieve local leaderId and Epoch with error $x, 
falling back to metadata cache")
+metadataCache.getPartitionInfo(tp.topic, tp.partition) match {
+  case Some(pinfo) => (pinfo.leader(), pinfo.leaderEpoch())
+  case None => (-1, -1)
+}
+}
+val leaderNode: Node = metadataCache.getAliveBrokerNode(leaderId, 
config.interBrokerListenerName).getOrElse({

Review Comment:
   We would but we would actually look it up twice in that path :) 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KIP-951: Server side and protocol changes for KIP-951 [kafka]

2023-10-17 Thread via GitHub


jolshan commented on code in PR #1:
URL: https://github.com/apache/kafka/pull/1#discussion_r1362384730


##
core/src/main/scala/kafka/server/KafkaApis.scala:
##
@@ -559,6 +560,26 @@ class KafkaApis(val requestChannel: RequestChannel,
 }
   }
 
+  case class LeaderNode(leaderId: Int, leaderEpoch: Int, node: Node)
+
+  private def getCurrentLeader(tp: TopicPartition): LeaderNode = {
+val partitionInfoOrError = replicaManager.getPartitionOrError(tp)
+val (leaderId, leaderEpoch) = partitionInfoOrError match {
+  case Right(x) =>
+(x.leaderReplicaIdOpt.getOrElse(-1), x.getLeaderEpoch)
+  case Left(x) =>
+debug(s"Unable to retrieve local leaderId and Epoch with error $x, 
falling back to metadata cache")
+metadataCache.getPartitionInfo(tp.topic, tp.partition) match {
+  case Some(pinfo) => (pinfo.leader(), pinfo.leaderEpoch())
+  case None => (-1, -1)
+}
+}
+val leaderNode: Node = metadataCache.getAliveBrokerNode(leaderId, 
config.interBrokerListenerName).getOrElse({

Review Comment:
   We would but we would actually look it up in the metadata cache twice in 
that path :) 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KIP-951: Server side and protocol changes for KIP-951 [kafka]

2023-10-17 Thread via GitHub


chb2ab commented on code in PR #1:
URL: https://github.com/apache/kafka/pull/1#discussion_r1362351368


##
core/src/main/scala/kafka/server/KafkaApis.scala:
##
@@ -559,6 +560,26 @@ class KafkaApis(val requestChannel: RequestChannel,
 }
   }
 
+  case class LeaderNode(leaderId: Int, leaderEpoch: Int, node: Node)
+
+  private def getCurrentLeader(tp: TopicPartition): LeaderNode = {
+val partitionInfoOrError = replicaManager.getPartitionOrError(tp)
+val (leaderId, leaderEpoch) = partitionInfoOrError match {
+  case Right(x) =>
+(x.leaderReplicaIdOpt.getOrElse(-1), x.getLeaderEpoch)
+  case Left(x) =>
+debug(s"Unable to retrieve local leaderId and Epoch with error $x, 
falling back to metadata cache")
+metadataCache.getPartitionInfo(tp.topic, tp.partition) match {
+  case Some(pinfo) => (pinfo.leader(), pinfo.leaderEpoch())
+  case None => (-1, -1)
+}
+}
+val leaderNode: Node = metadataCache.getAliveBrokerNode(leaderId, 
config.interBrokerListenerName).getOrElse({

Review Comment:
   The metadata cache having more up to date information makes sense to me, but 
I don't follow the deletion case, would reading from the replica manager not 
return NOT_LEADER_OR_FOLLOWER there? It seems like we should still fallback to 
the metadata cache in that case



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KIP-951: Server side and protocol changes for KIP-951 [kafka]

2023-10-17 Thread via GitHub


chb2ab commented on code in PR #1:
URL: https://github.com/apache/kafka/pull/1#discussion_r1362351368


##
core/src/main/scala/kafka/server/KafkaApis.scala:
##
@@ -559,6 +560,26 @@ class KafkaApis(val requestChannel: RequestChannel,
 }
   }
 
+  case class LeaderNode(leaderId: Int, leaderEpoch: Int, node: Node)
+
+  private def getCurrentLeader(tp: TopicPartition): LeaderNode = {
+val partitionInfoOrError = replicaManager.getPartitionOrError(tp)
+val (leaderId, leaderEpoch) = partitionInfoOrError match {
+  case Right(x) =>
+(x.leaderReplicaIdOpt.getOrElse(-1), x.getLeaderEpoch)
+  case Left(x) =>
+debug(s"Unable to retrieve local leaderId and Epoch with error $x, 
falling back to metadata cache")
+metadataCache.getPartitionInfo(tp.topic, tp.partition) match {
+  case Some(pinfo) => (pinfo.leader(), pinfo.leaderEpoch())
+  case None => (-1, -1)
+}
+}
+val leaderNode: Node = metadataCache.getAliveBrokerNode(leaderId, 
config.interBrokerListenerName).getOrElse({

Review Comment:
   The metadata cache having more up to date information makes sense to me, but 
I don't follow the deletion case, would reading from the replica manager not 
return NOT_LEADER_OR_FOLLOWER there? It seems like we should still fallback to 
the cache in that case



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KIP-951: Server side and protocol changes for KIP-951 [kafka]

2023-10-17 Thread via GitHub


chb2ab commented on code in PR #1:
URL: https://github.com/apache/kafka/pull/1#discussion_r1362351368


##
core/src/main/scala/kafka/server/KafkaApis.scala:
##
@@ -559,6 +560,26 @@ class KafkaApis(val requestChannel: RequestChannel,
 }
   }
 
+  case class LeaderNode(leaderId: Int, leaderEpoch: Int, node: Node)
+
+  private def getCurrentLeader(tp: TopicPartition): LeaderNode = {
+val partitionInfoOrError = replicaManager.getPartitionOrError(tp)
+val (leaderId, leaderEpoch) = partitionInfoOrError match {
+  case Right(x) =>
+(x.leaderReplicaIdOpt.getOrElse(-1), x.getLeaderEpoch)
+  case Left(x) =>
+debug(s"Unable to retrieve local leaderId and Epoch with error $x, 
falling back to metadata cache")
+metadataCache.getPartitionInfo(tp.topic, tp.partition) match {
+  case Some(pinfo) => (pinfo.leader(), pinfo.leaderEpoch())
+  case None => (-1, -1)
+}
+}
+val leaderNode: Node = metadataCache.getAliveBrokerNode(leaderId, 
config.interBrokerListenerName).getOrElse({

Review Comment:
   The metadata cache having more up to date information makes sense to me, but 
I don't follow the deletion case, would reading from the replica manager not 
return NOT_LEADER_OR_FOLLOWER there?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KIP-951: Server side and protocol changes for KIP-951 [kafka]

2023-10-17 Thread via GitHub


splett2 commented on code in PR #1:
URL: https://github.com/apache/kafka/pull/1#discussion_r1362252576


##
core/src/main/scala/kafka/server/KafkaApis.scala:
##
@@ -559,6 +560,26 @@ class KafkaApis(val requestChannel: RequestChannel,
 }
   }
 
+  case class LeaderNode(leaderId: Int, leaderEpoch: Int, node: Node)
+
+  private def getCurrentLeader(tp: TopicPartition): LeaderNode = {
+val partitionInfoOrError = replicaManager.getPartitionOrError(tp)
+val (leaderId, leaderEpoch) = partitionInfoOrError match {
+  case Right(x) =>
+(x.leaderReplicaIdOpt.getOrElse(-1), x.getLeaderEpoch)
+  case Left(x) =>
+debug(s"Unable to retrieve local leaderId and Epoch with error $x, 
falling back to metadata cache")
+metadataCache.getPartitionInfo(tp.topic, tp.partition) match {
+  case Some(pinfo) => (pinfo.leader(), pinfo.leaderEpoch())
+  case None => (-1, -1)
+}
+}
+val leaderNode: Node = metadataCache.getAliveBrokerNode(leaderId, 
config.interBrokerListenerName).getOrElse({

Review Comment:
   the extra object allocation is not a big issue, since the new leader and new 
leader lookup are not done in the common case, only in erroring cases.
   
   populating the new leader state from the `Partition` also doesn't work for 
cases where the partition gets deleted from the leader, for instance in cases 
with reassignments, so populating from the metadata cache is both more likely 
to have up-to-date information (in KRaft mode, which we should assume to be the 
default) and it handles NotLeader in more cases.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KIP-951: Server side and protocol changes for KIP-951 [kafka]

2023-10-16 Thread via GitHub


chb2ab commented on code in PR #1:
URL: https://github.com/apache/kafka/pull/1#discussion_r1361154283


##
server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java:
##
@@ -338,7 +338,7 @@ public boolean isControllerRegistrationSupported() {
 
 public short fetchRequestVersion() {
 if (this.isAtLeast(IBP_3_5_IV1)) {
-return 15;
+return 16;

Review Comment:
   > Since we are using MV/IBP bumps for fetches, a simple thing to do would be 
to pick up the newest MV for this release and include the fetch bump here
   
   ok, so my understanding is instead of bumping IBP to `IBP_3_7_IV1` we would 
wait for the release of 'IBP_3_8_IV0' to bump the fetch version here to 16.
   
   > The alternative is setting up the fetch path to use ApiVersions to ensure 
the correct version. But that might be out of scope for this change.
   
   yeah, I would need to look more into this, I'm not familiar enough to know 
how it might look.
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KIP-951: Server side and protocol changes for KIP-951 [kafka]

2023-10-16 Thread via GitHub


chb2ab commented on code in PR #1:
URL: https://github.com/apache/kafka/pull/1#discussion_r1361154283


##
server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java:
##
@@ -338,7 +338,7 @@ public boolean isControllerRegistrationSupported() {
 
 public short fetchRequestVersion() {
 if (this.isAtLeast(IBP_3_5_IV1)) {
-return 15;
+return 16;

Review Comment:
   > Since we are using MV/IBP bumps for fetches, a simple thing to do would be 
to pick up the newest MV for this release and include the fetch bump here
   
   ok, so my understanding is instead of bumping IBP to `IBP_3_7_IV1` we would 
wait for the release of `IBP_3_8_IV0` to bump the fetch version here to 16.
   
   > The alternative is setting up the fetch path to use ApiVersions to ensure 
the correct version. But that might be out of scope for this change.
   
   yeah, I would need to look more into this, I'm not familiar enough to know 
how it might look.
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KIP-951: Server side and protocol changes for KIP-951 [kafka]

2023-10-16 Thread via GitHub


chb2ab commented on code in PR #1:
URL: https://github.com/apache/kafka/pull/1#discussion_r1361075604


##
core/src/main/scala/kafka/server/KafkaApis.scala:
##
@@ -559,6 +560,26 @@ class KafkaApis(val requestChannel: RequestChannel,
 }
   }
 
+  case class LeaderNode(leaderId: Int, leaderEpoch: Int, node: Node)
+
+  private def getCurrentLeader(tp: TopicPartition): LeaderNode = {
+val partitionInfoOrError = replicaManager.getPartitionOrError(tp)
+val (leaderId, leaderEpoch) = partitionInfoOrError match {
+  case Right(x) =>
+(x.leaderReplicaIdOpt.getOrElse(-1), x.getLeaderEpoch)
+  case Left(x) =>
+debug(s"Unable to retrieve local leaderId and Epoch with error $x, 
falling back to metadata cache")
+metadataCache.getPartitionInfo(tp.topic, tp.partition) match {
+  case Some(pinfo) => (pinfo.leader(), pinfo.leaderEpoch())
+  case None => (-1, -1)
+}
+}
+val leaderNode: Node = metadataCache.getAliveBrokerNode(leaderId, 
config.interBrokerListenerName).getOrElse({

Review Comment:
   I looked more into this and I see replica manager looks up the partition 
from a Pool object while metadata cache looks it up in the current image and 
creates a new UpdateMetadataPartitionState to return. I think we can avoid an 
allocation using the replica manager, also since the fetch/produce paths should 
have recently tried to read through replica manager I think it's more likely to 
give an in-memory cache hit than the metadata path. It still seems better to me 
to try from replica manager first, what do you all think?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KIP-951: Server side and protocol changes for KIP-951 [kafka]

2023-10-16 Thread via GitHub


chb2ab commented on code in PR #1:
URL: https://github.com/apache/kafka/pull/1#discussion_r1361075604


##
core/src/main/scala/kafka/server/KafkaApis.scala:
##
@@ -559,6 +560,26 @@ class KafkaApis(val requestChannel: RequestChannel,
 }
   }
 
+  case class LeaderNode(leaderId: Int, leaderEpoch: Int, node: Node)
+
+  private def getCurrentLeader(tp: TopicPartition): LeaderNode = {
+val partitionInfoOrError = replicaManager.getPartitionOrError(tp)
+val (leaderId, leaderEpoch) = partitionInfoOrError match {
+  case Right(x) =>
+(x.leaderReplicaIdOpt.getOrElse(-1), x.getLeaderEpoch)
+  case Left(x) =>
+debug(s"Unable to retrieve local leaderId and Epoch with error $x, 
falling back to metadata cache")
+metadataCache.getPartitionInfo(tp.topic, tp.partition) match {
+  case Some(pinfo) => (pinfo.leader(), pinfo.leaderEpoch())
+  case None => (-1, -1)
+}
+}
+val leaderNode: Node = metadataCache.getAliveBrokerNode(leaderId, 
config.interBrokerListenerName).getOrElse({

Review Comment:
   I looked more into this and I see replica manager looks up the partition 
from a Pool object while metadata cache looks it up in the current image and 
creates a new UpdateMetadataPartitionState to return. I think we can avoid an 
allocation using the replica manager, also since the fetch/produce paths should 
have recently tried to read through replica manager I think it's more likely to 
give an in-memory cache hit than the metadata path. It seems better to me to 
try from replica manager first, what do you all think?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KIP-951: Server side and protocol changes for KIP-951 [kafka]

2023-10-16 Thread via GitHub


chb2ab commented on code in PR #1:
URL: https://github.com/apache/kafka/pull/1#discussion_r1361075604


##
core/src/main/scala/kafka/server/KafkaApis.scala:
##
@@ -559,6 +560,26 @@ class KafkaApis(val requestChannel: RequestChannel,
 }
   }
 
+  case class LeaderNode(leaderId: Int, leaderEpoch: Int, node: Node)
+
+  private def getCurrentLeader(tp: TopicPartition): LeaderNode = {
+val partitionInfoOrError = replicaManager.getPartitionOrError(tp)
+val (leaderId, leaderEpoch) = partitionInfoOrError match {
+  case Right(x) =>
+(x.leaderReplicaIdOpt.getOrElse(-1), x.getLeaderEpoch)
+  case Left(x) =>
+debug(s"Unable to retrieve local leaderId and Epoch with error $x, 
falling back to metadata cache")
+metadataCache.getPartitionInfo(tp.topic, tp.partition) match {
+  case Some(pinfo) => (pinfo.leader(), pinfo.leaderEpoch())
+  case None => (-1, -1)
+}
+}
+val leaderNode: Node = metadataCache.getAliveBrokerNode(leaderId, 
config.interBrokerListenerName).getOrElse({

Review Comment:
   I looked more into this and I see replica manager looks up the partition 
from a Pool object while metadata cache looks it up in the current image and 
creates a new UpdateMetadataPartitionState to return. I think we can avoid an 
allocation using the replica manager, also since the fetch/produce paths should 
have recently tried to read through replica manager I think it's more likely to 
give an in-memory cache hit than the metadata path. It still seems better to me 
to try from replica manager first, what do you all think?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KIP-951: Server side and protocol changes for KIP-951 [kafka]

2023-10-16 Thread via GitHub


jolshan commented on code in PR #1:
URL: https://github.com/apache/kafka/pull/1#discussion_r1361003116


##
core/src/main/scala/kafka/server/KafkaApis.scala:
##
@@ -559,6 +560,26 @@ class KafkaApis(val requestChannel: RequestChannel,
 }
   }
 
+  case class LeaderNode(leaderId: Int, leaderEpoch: Int, node: Node)
+
+  private def getCurrentLeader(tp: TopicPartition): LeaderNode = {
+val partitionInfoOrError = replicaManager.getPartitionOrError(tp)
+val (leaderId, leaderEpoch) = partitionInfoOrError match {
+  case Right(x) =>
+(x.leaderReplicaIdOpt.getOrElse(-1), x.getLeaderEpoch)
+  case Left(x) =>
+debug(s"Unable to retrieve local leaderId and Epoch with error $x, 
falling back to metadata cache")
+metadataCache.getPartitionInfo(tp.topic, tp.partition) match {
+  case Some(pinfo) => (pinfo.leader(), pinfo.leaderEpoch())
+  case None => (-1, -1)
+}
+}
+val leaderNode: Node = metadataCache.getAliveBrokerNode(leaderId, 
config.interBrokerListenerName).getOrElse({

Review Comment:
   Hmmm -- I'm not sure I understand "more commonly used path".
   ReplicaManager will have the partition if the broker hosts it. The 
metadatacache is meant to be a cache of all the partitions, so I don't think it 
loses out on "caching benefits"
   
   The benefit of the replica manager is that it also contains the log of the 
partition. If the metadata cache is sufficient (which it seems to be) we should 
probably just use that.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KIP-951: Server side and protocol changes for KIP-951 [kafka]

2023-10-16 Thread via GitHub


jolshan commented on code in PR #1:
URL: https://github.com/apache/kafka/pull/1#discussion_r1360994577


##
server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java:
##
@@ -338,7 +338,7 @@ public boolean isControllerRegistrationSupported() {
 
 public short fetchRequestVersion() {
 if (this.isAtLeast(IBP_3_5_IV1)) {
-return 15;
+return 16;

Review Comment:
   I chatted with David J about this offline. Since we are using MV/IBP bumps 
for fetches, a simple thing to do would be to pick up the newest MV for this 
release and include the fetch bump here.
   
   The alternative is setting up the fetch path to use ApiVersions to ensure 
the correct version. But that might be out of scope for this change. 
   
   With either of these approaches we can keep the latest version for the 
replication fetches which would make things a little clearer.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KIP-951: Server side and protocol changes for KIP-951 [kafka]

2023-10-16 Thread via GitHub


chb2ab commented on code in PR #1:
URL: https://github.com/apache/kafka/pull/1#discussion_r1360797303


##
core/src/main/scala/kafka/server/KafkaApis.scala:
##
@@ -559,6 +560,26 @@ class KafkaApis(val requestChannel: RequestChannel,
 }
   }
 
+  case class LeaderNode(leaderId: Int, leaderEpoch: Int, node: Node)
+
+  private def getCurrentLeader(tp: TopicPartition): LeaderNode = {
+val partitionInfoOrError = replicaManager.getPartitionOrError(tp)
+val (leaderId, leaderEpoch) = partitionInfoOrError match {
+  case Right(x) =>
+(x.leaderReplicaIdOpt.getOrElse(-1), x.getLeaderEpoch)
+  case Left(x) =>
+debug(s"Unable to retrieve local leaderId and Epoch with error $x, 
falling back to metadata cache")
+metadataCache.getPartitionInfo(tp.topic, tp.partition) match {
+  case Some(pinfo) => (pinfo.leader(), pinfo.leaderEpoch())
+  case None => (-1, -1)
+}
+}
+val leaderNode: Node = metadataCache.getAliveBrokerNode(leaderId, 
config.interBrokerListenerName).getOrElse({

Review Comment:
   ok, I think replica manager is the more commonly used path which may have 
caching benefits but I'm not sure, does that make sense?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KIP-951: Server side and protocol changes for KIP-951 [kafka]

2023-10-13 Thread via GitHub


splett2 commented on code in PR #1:
URL: https://github.com/apache/kafka/pull/1#discussion_r1358837845


##
core/src/main/scala/kafka/server/KafkaApis.scala:
##
@@ -620,6 +642,21 @@ class KafkaApis(val requestChannel: RequestChannel,
 topicPartition,
 status.error.exceptionName))
 }
+
+if (request.header.apiVersion >= 10) {
+  status.currentLeader = {
+status.error match {
+  case Errors.NOT_LEADER_OR_FOLLOWER | Errors.FENCED_LEADER_EPOCH 
=>

Review Comment:
   produce requests do not include a leader epoch => they can never get fenced 
leader epoch.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KIP-951: Server side and protocol changes for KIP-951 [kafka]

2023-10-13 Thread via GitHub


splett2 commented on code in PR #1:
URL: https://github.com/apache/kafka/pull/1#discussion_r1358837039


##
core/src/main/scala/kafka/server/KafkaApis.scala:
##
@@ -559,6 +560,26 @@ class KafkaApis(val requestChannel: RequestChannel,
 }
   }
 
+  case class LeaderNode(leaderId: Int, leaderEpoch: Int, node: Node)
+
+  private def getCurrentLeader(tp: TopicPartition): LeaderNode = {
+val partitionInfoOrError = replicaManager.getPartitionOrError(tp)
+val (leaderId, leaderEpoch) = partitionInfoOrError match {
+  case Right(x) =>
+(x.leaderReplicaIdOpt.getOrElse(-1), x.getLeaderEpoch)
+  case Left(x) =>
+debug(s"Unable to retrieve local leaderId and Epoch with error $x, 
falling back to metadata cache")
+metadataCache.getPartitionInfo(tp.topic, tp.partition) match {
+  case Some(pinfo) => (pinfo.leader(), pinfo.leaderEpoch())
+  case None => (-1, -1)
+}
+}
+val leaderNode: Node = metadataCache.getAliveBrokerNode(leaderId, 
config.interBrokerListenerName).getOrElse({

Review Comment:
   my point in the previous comment is that will never be the case with KRaft.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KIP-951: Server side and protocol changes for KIP-951 [kafka]

2023-10-13 Thread via GitHub


chb2ab commented on code in PR #1:
URL: https://github.com/apache/kafka/pull/1#discussion_r1358826931


##
core/src/main/scala/kafka/server/KafkaApis.scala:
##
@@ -620,6 +642,21 @@ class KafkaApis(val requestChannel: RequestChannel,
 topicPartition,
 status.error.exceptionName))
 }
+
+if (request.header.apiVersion >= 10) {
+  status.currentLeader = {
+status.error match {
+  case Errors.NOT_LEADER_OR_FOLLOWER | Errors.FENCED_LEADER_EPOCH 
=>

Review Comment:
   ok, removed FENCED_LEADER_EPOCH from the produce path



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KIP-951: Server side and protocol changes for KIP-951 [kafka]

2023-10-13 Thread via GitHub


jolshan commented on code in PR #1:
URL: https://github.com/apache/kafka/pull/1#discussion_r1358796082


##
core/src/main/scala/kafka/server/KafkaApis.scala:
##
@@ -620,6 +642,21 @@ class KafkaApis(val requestChannel: RequestChannel,
 topicPartition,
 status.error.exceptionName))
 }
+
+if (request.header.apiVersion >= 10) {
+  status.currentLeader = {
+status.error match {
+  case Errors.NOT_LEADER_OR_FOLLOWER | Errors.FENCED_LEADER_EPOCH 
=>

Review Comment:
   I think the error is only returned on fetch requests



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KIP-951: Server side and protocol changes for KIP-951 [kafka]

2023-10-13 Thread via GitHub


chb2ab commented on code in PR #1:
URL: https://github.com/apache/kafka/pull/1#discussion_r1358773873


##
core/src/main/scala/kafka/server/KafkaApis.scala:
##
@@ -620,6 +642,21 @@ class KafkaApis(val requestChannel: RequestChannel,
 topicPartition,
 status.error.exceptionName))
 }
+
+if (request.header.apiVersion >= 10) {
+  status.currentLeader = {
+status.error match {
+  case Errors.NOT_LEADER_OR_FOLLOWER | Errors.FENCED_LEADER_EPOCH 
=>

Review Comment:
   will move this into the if block.
   
   why can't produce receive FENCED_LEADER_EPOCH?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KIP-951: Server side and protocol changes for KIP-951 [kafka]

2023-10-13 Thread via GitHub


chb2ab commented on code in PR #1:
URL: https://github.com/apache/kafka/pull/1#discussion_r1358773301


##
core/src/main/scala/kafka/server/KafkaApis.scala:
##
@@ -559,6 +560,26 @@ class KafkaApis(val requestChannel: RequestChannel,
 }
   }
 
+  case class LeaderNode(leaderId: Int, leaderEpoch: Int, node: Node)
+
+  private def getCurrentLeader(tp: TopicPartition): LeaderNode = {
+val partitionInfoOrError = replicaManager.getPartitionOrError(tp)
+val (leaderId, leaderEpoch) = partitionInfoOrError match {
+  case Right(x) =>
+(x.leaderReplicaIdOpt.getOrElse(-1), x.getLeaderEpoch)
+  case Left(x) =>
+debug(s"Unable to retrieve local leaderId and Epoch with error $x, 
falling back to metadata cache")
+metadataCache.getPartitionInfo(tp.topic, tp.partition) match {
+  case Some(pinfo) => (pinfo.leader(), pinfo.leaderEpoch())
+  case None => (-1, -1)
+}
+}
+val leaderNode: Node = metadataCache.getAliveBrokerNode(leaderId, 
config.interBrokerListenerName).getOrElse({

Review Comment:
   will update the listener name.
   
   I think the motivation for checking replica manager first is it may be 
faster than metadata cache.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KIP-951: Server side and protocol changes for KIP-951 [kafka]

2023-10-13 Thread via GitHub


splett2 commented on code in PR #1:
URL: https://github.com/apache/kafka/pull/1#discussion_r1358490055


##
core/src/main/scala/kafka/server/KafkaApis.scala:
##
@@ -559,6 +560,26 @@ class KafkaApis(val requestChannel: RequestChannel,
 }
   }
 
+  case class LeaderNode(leaderId: Int, leaderEpoch: Int, node: Node)
+
+  private def getCurrentLeader(tp: TopicPartition): LeaderNode = {
+val partitionInfoOrError = replicaManager.getPartitionOrError(tp)
+val (leaderId, leaderEpoch) = partitionInfoOrError match {
+  case Right(x) =>
+(x.leaderReplicaIdOpt.getOrElse(-1), x.getLeaderEpoch)
+  case Left(x) =>
+debug(s"Unable to retrieve local leaderId and Epoch with error $x, 
falling back to metadata cache")
+metadataCache.getPartitionInfo(tp.topic, tp.partition) match {
+  case Some(pinfo) => (pinfo.leader(), pinfo.leaderEpoch())
+  case None => (-1, -1)
+}
+}
+val leaderNode: Node = metadataCache.getAliveBrokerNode(leaderId, 
config.interBrokerListenerName).getOrElse({

Review Comment:
   we shouldn't be passing through the interbroker listener name, we should be 
using the listener used by the original request to be consistent with the 
metadata request.
   
   Is it simpler if we just consult the metadata cache? In KRaft mode, the 
metadata cache is the source of truth for partition leadership and is updated 
before the partition state gets updated.



##
core/src/main/scala/kafka/server/KafkaApis.scala:
##
@@ -620,6 +642,21 @@ class KafkaApis(val requestChannel: RequestChannel,
 topicPartition,
 status.error.exceptionName))
 }
+
+if (request.header.apiVersion >= 10) {
+  status.currentLeader = {
+status.error match {
+  case Errors.NOT_LEADER_OR_FOLLOWER | Errors.FENCED_LEADER_EPOCH 
=>

Review Comment:
   produce requests should never receive FENCED_LEADER_EPOCH.
   
   also, shouldn't this go in the above `if` block?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KIP-951: Server side and protocol changes for KIP-951 [kafka]

2023-10-12 Thread via GitHub


jolshan commented on code in PR #1:
URL: https://github.com/apache/kafka/pull/1#discussion_r1357390202


##
core/src/main/scala/kafka/server/KafkaApis.scala:
##
@@ -559,6 +560,30 @@ class KafkaApis(val requestChannel: RequestChannel,
 }
   }
 
+  case class LeaderNode(leaderId: Int, leaderEpoch: Int, node: Node)
+
+  private def getCurrentLeader(tp: TopicPartition): LeaderNode = {
+val partitionInfoOrError = replicaManager.getPartitionOrError(tp)
+var leaderId = -1
+var leaderEpoch = -1
+partitionInfoOrError match {
+  case Right(x) =>
+  leaderId = x.leaderReplicaIdOpt.getOrElse(-1)
+  leaderEpoch = x.getLeaderEpoch
+  case Left(x) =>
+debug(s"Unable to retrieve local leaderId and Epoch with error $x, 
falling back to metadata cache")
+val partitionInfo = metadataCache.getPartitionInfo(tp.topic, 
tp.partition)
+partitionInfo.foreach { info =>

Review Comment:
   Yeah, using foreach for scala options is a common pattern.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KIP-951: Server side and protocol changes for KIP-951 [kafka]

2023-10-12 Thread via GitHub


chb2ab commented on code in PR #1:
URL: https://github.com/apache/kafka/pull/1#discussion_r1357361724


##
clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java:
##
@@ -67,20 +69,31 @@ public ProduceResponse(ProduceResponseData 
produceResponseData) {
  */
 @Deprecated
 public ProduceResponse(Map responses) {
-this(responses, DEFAULT_THROTTLE_TIME);
+this(responses, DEFAULT_THROTTLE_TIME, Collections.emptyList());
 }
 
 /**
- * Constructor for the latest version
+ * Constructor for versions <= 9
  * @param responses Produced data grouped by topic-partition
  * @param throttleTimeMs Time in milliseconds the response was throttled
  */
 @Deprecated
 public ProduceResponse(Map responses, 
int throttleTimeMs) {
-this(toData(responses, throttleTimeMs));
+this(toData(responses, throttleTimeMs, Collections.emptyList()));
+}
+
+/**
+ * Constructor for the latest version
+ * @param responses Produced data grouped by topic-partition
+ * @param throttleTimeMs Time in milliseconds the response was throttled
+ * @param nodeEndpoints List of node endpoints
+ */
+@Deprecated

Review Comment:
   I'll try to find time to finish the changes for 
[KAFKA-10730](https://issues.apache.org/jira/browse/KAFKA-10730), I think 
refactoring the tests would take some time but overall I agree it doesn't seem 
too big.
   
   I'm ok with removing the deprecation, but I suspect the incentive to do the 
refactoring will be lost, so leaving it for now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KIP-951: Server side and protocol changes for KIP-951 [kafka]

2023-10-12 Thread via GitHub


chb2ab commented on code in PR #1:
URL: https://github.com/apache/kafka/pull/1#discussion_r1357335847


##
core/src/main/scala/kafka/server/KafkaApis.scala:
##
@@ -559,6 +560,30 @@ class KafkaApis(val requestChannel: RequestChannel,
 }
   }
 
+  case class LeaderNode(leaderId: Int, leaderEpoch: Int, node: Node)
+
+  private def getCurrentLeader(tp: TopicPartition): LeaderNode = {
+val partitionInfoOrError = replicaManager.getPartitionOrError(tp)
+var leaderId = -1
+var leaderEpoch = -1
+partitionInfoOrError match {
+  case Right(x) =>
+  leaderId = x.leaderReplicaIdOpt.getOrElse(-1)
+  leaderEpoch = x.getLeaderEpoch
+  case Left(x) =>
+debug(s"Unable to retrieve local leaderId and Epoch with error $x, 
falling back to metadata cache")
+val partitionInfo = metadataCache.getPartitionInfo(tp.topic, 
tp.partition)
+partitionInfo.foreach { info =>

Review Comment:
   I think the confusion was that foreach implies a list of elements, but there 
can only be 1 here. I like the use of tuple and match/case here though, I will 
update to that



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KIP-951: Server side and protocol changes for KIP-951 [kafka]

2023-10-12 Thread via GitHub


chb2ab commented on code in PR #1:
URL: https://github.com/apache/kafka/pull/1#discussion_r1357293893


##
server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java:
##
@@ -338,7 +338,7 @@ public boolean isControllerRegistrationSupported() {
 
 public short fetchRequestVersion() {
 if (this.isAtLeast(IBP_3_5_IV1)) {
-return 15;
+return 16;

Review Comment:
   I remember why I added this now, we have a test that checks we're using the 
latest version for various API's. I guess we would need to remove that for 
FETCH.
   
https://github.com/apache/kafka/blob/13b2edd9af802d7db99815e834b062aafc3e2313/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala#L120-L131



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KIP-951: Server side and protocol changes for KIP-951 [kafka]

2023-10-12 Thread via GitHub


chb2ab commented on code in PR #1:
URL: https://github.com/apache/kafka/pull/1#discussion_r1357293893


##
server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java:
##
@@ -338,7 +338,7 @@ public boolean isControllerRegistrationSupported() {
 
 public short fetchRequestVersion() {
 if (this.isAtLeast(IBP_3_5_IV1)) {
-return 15;
+return 16;

Review Comment:
   I remember why I added this now, we have a test that checks we're using the 
latest version for various API's. I guess we would need to remove that for 
FETCH.
   
https://github.com/apache/kafka/blob/13b2edd9af802d7db99815e834b062aafc3e2313/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala#L120-L121



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KIP-951: Server side and protocol changes for KIP-951 [kafka]

2023-10-12 Thread via GitHub


chb2ab commented on code in PR #1:
URL: https://github.com/apache/kafka/pull/1#discussion_r1357293893


##
server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java:
##
@@ -338,7 +338,7 @@ public boolean isControllerRegistrationSupported() {
 
 public short fetchRequestVersion() {
 if (this.isAtLeast(IBP_3_5_IV1)) {
-return 15;
+return 16;

Review Comment:
   I remember why I added this now, we have a test that checks we're using the 
latest version for various API's. I guess we would need to remove that for 
FETCH.
   
https://github.com/apache/kafka/blob/13b2edd9af802d7db99815e834b062aafc3e2313/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala#L120



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KIP-951: Server side and protocol changes for KIP-951 [kafka]

2023-10-12 Thread via GitHub


chb2ab commented on code in PR #1:
URL: https://github.com/apache/kafka/pull/1#discussion_r1357265438


##
server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java:
##
@@ -338,7 +338,7 @@ public boolean isControllerRegistrationSupported() {
 
 public short fetchRequestVersion() {
 if (this.isAtLeast(IBP_3_5_IV1)) {
-return 15;
+return 16;

Review Comment:
   I'll remove this version bump, I hadn't realized it was only for replication 
fetches.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KIP-951: Server side and protocol changes for KIP-951 [kafka]

2023-10-12 Thread via GitHub


chb2ab commented on code in PR #1:
URL: https://github.com/apache/kafka/pull/1#discussion_r1357265438


##
server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java:
##
@@ -338,7 +338,7 @@ public boolean isControllerRegistrationSupported() {
 
 public short fetchRequestVersion() {
 if (this.isAtLeast(IBP_3_5_IV1)) {
-return 15;
+return 16;

Review Comment:
   I'll remove this version bump.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KIP-951: Server side and protocol changes for KIP-951 [kafka]

2023-10-12 Thread via GitHub


jolshan commented on code in PR #1:
URL: https://github.com/apache/kafka/pull/1#discussion_r1357229686


##
server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java:
##
@@ -338,7 +338,7 @@ public boolean isControllerRegistrationSupported() {
 
 public short fetchRequestVersion() {
 if (this.isAtLeast(IBP_3_5_IV1)) {
-return 15;
+return 16;

Review Comment:
   It is a bit weird to not bump the version though. I guess if there is ever a 
v17 that is for replication, we would go straight to that (with MV/IBP bump) 
and that would be ok.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KIP-951: Server side and protocol changes for KIP-951 [kafka]

2023-10-12 Thread via GitHub


splett2 commented on code in PR #1:
URL: https://github.com/apache/kafka/pull/1#discussion_r1357227824


##
server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java:
##
@@ -338,7 +338,7 @@ public boolean isControllerRegistrationSupported() {
 
 public short fetchRequestVersion() {
 if (this.isAtLeast(IBP_3_5_IV1)) {
-return 15;
+return 16;

Review Comment:
   I don't think there's any reason to bump the fetch version for replication 
fetches. this would be purely a consumer client optimization.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KIP-951: Server side and protocol changes for KIP-951 [kafka]

2023-10-12 Thread via GitHub


jolshan commented on code in PR #1:
URL: https://github.com/apache/kafka/pull/1#discussion_r1357208153


##
server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java:
##
@@ -338,7 +338,7 @@ public boolean isControllerRegistrationSupported() {
 
 public short fetchRequestVersion() {
 if (this.isAtLeast(IBP_3_5_IV1)) {
-return 15;
+return 16;

Review Comment:
   I believe so.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KIP-951: Server side and protocol changes for KIP-951 [kafka]

2023-10-12 Thread via GitHub


chb2ab commented on code in PR #1:
URL: https://github.com/apache/kafka/pull/1#discussion_r1357205210


##
server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java:
##
@@ -338,7 +338,7 @@ public boolean isControllerRegistrationSupported() {
 
 public short fetchRequestVersion() {
 if (this.isAtLeast(IBP_3_5_IV1)) {
-return 15;
+return 16;

Review Comment:
   Ah I hadn't thought this through. I think this would need an IBP version 
bump to avoid errors during upgrades?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KIP-951: Server side and protocol changes for KIP-951 [kafka]

2023-10-12 Thread via GitHub


jolshan commented on code in PR #1:
URL: https://github.com/apache/kafka/pull/1#discussion_r1357182431


##
clients/src/main/resources/common/message/ProduceResponse.json:
##
@@ -32,7 +32,9 @@
   // records that cause the whole batch to be dropped.  See KIP-467 for 
details.
   //
   // Version 9 enables flexible versions.
-  "validVersions": "0-9",
+  //
+  // Version 10 adds 'CurrentLeader' and 'NodeEndpoints' as tagged fields

Review Comment:
   I'm also wondering if there is a flaw with this approach. This would mean we 
need to bump MV after all for inter broker requests right? 
https://github.com/apache/kafka/pull/1#discussion_r1357181404



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KIP-951: Server side and protocol changes for KIP-951 [kafka]

2023-10-12 Thread via GitHub


jolshan commented on code in PR #1:
URL: https://github.com/apache/kafka/pull/1#discussion_r1357181404


##
server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java:
##
@@ -338,7 +338,7 @@ public boolean isControllerRegistrationSupported() {
 
 public short fetchRequestVersion() {
 if (this.isAtLeast(IBP_3_5_IV1)) {
-return 15;
+return 16;

Review Comment:
   Clusters with ibp 3.5 were guaranteed to support version 15 since that was 
the version we defined the ibp. I don't think we can just change the version 
because some clusters will have ibp 3.5, but not version 16 fetch requests. We 
could avoid this with the tagged fields, but since we are bumping the version, 
we run into a problem.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KIP-951: Server side and protocol changes for KIP-951 [kafka]

2023-10-12 Thread via GitHub


jolshan commented on code in PR #1:
URL: https://github.com/apache/kafka/pull/1#discussion_r1357176285


##
server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java:
##
@@ -338,7 +338,7 @@ public boolean isControllerRegistrationSupported() {
 
 public short fetchRequestVersion() {
 if (this.isAtLeast(IBP_3_5_IV1)) {
-return 15;
+return 16;

Review Comment:
   I'm just trying to figure out if we would see unsupported version errors 
during upgrades. I think we might.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KIP-951: Server side and protocol changes for KIP-951 [kafka]

2023-10-12 Thread via GitHub


chb2ab commented on code in PR #1:
URL: https://github.com/apache/kafka/pull/1#discussion_r1357169045


##
server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java:
##
@@ -338,7 +338,7 @@ public boolean isControllerRegistrationSupported() {
 
 public short fetchRequestVersion() {
 if (this.isAtLeast(IBP_3_5_IV1)) {
-return 15;
+return 16;

Review Comment:
   I think you're right that this could cause issues during upgrade, I think 
using the latest version should be safe.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KIP-951: Server side and protocol changes for KIP-951 [kafka]

2023-10-10 Thread via GitHub


dajac commented on code in PR #1:
URL: https://github.com/apache/kafka/pull/1#discussion_r1352614826


##
clients/src/main/resources/common/message/ProduceResponse.json:
##
@@ -32,7 +32,9 @@
   // records that cause the whole batch to be dropped.  See KIP-467 for 
details.
   //
   // Version 9 enables flexible versions.
-  "validVersions": "0-9",
+  //
+  // Version 10 adds 'CurrentLeader' and 'NodeEndpoints' as tagged fields

Review Comment:
   @jolshan I think that it is hard to really define a policy for this. It 
mainly depends on whether there is a justification to require an epoch bump or 
not. In this case, I believe that there is one but this may not always be true.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KIP-951: Server side and protocol changes for KIP-951 [kafka]

2023-10-10 Thread via GitHub


dajac commented on code in PR #1:
URL: https://github.com/apache/kafka/pull/1#discussion_r1352612988


##
clients/src/main/resources/common/message/ProduceResponse.json:
##
@@ -32,7 +32,9 @@
   // records that cause the whole batch to be dropped.  See KIP-467 for 
details.
   //
   // Version 9 enables flexible versions.
-  "validVersions": "0-9",
+  //
+  // Version 10 adds 'CurrentLeader' and 'NodeEndpoints' as tagged fields

Review Comment:
   @hachikuji Yeah, your point is totally valid. I was pushing for this with 
the java client (and potentially librdkafka) in mind. I think that it will make 
request analysis easier as you said.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KIP-951: Server side and protocol changes for KIP-951 [kafka]

2023-10-10 Thread via GitHub


msn-tldr commented on code in PR #1:
URL: https://github.com/apache/kafka/pull/1#discussion_r1352576812


##
clients/src/main/resources/common/message/ProduceResponse.json:
##
@@ -32,7 +32,9 @@
   // records that cause the whole batch to be dropped.  See KIP-467 for 
details.
   //
   // Version 9 enables flexible versions.
-  "validVersions": "0-9",
+  //
+  // Version 10 adds 'CurrentLeader' and 'NodeEndpoints' as tagged fields

Review Comment:
   @jolshan +1



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KIP-951: Server side and protocol changes for KIP-951 [kafka]

2023-10-09 Thread via GitHub


jolshan commented on code in PR #1:
URL: https://github.com/apache/kafka/pull/1#discussion_r1350945079


##
server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java:
##
@@ -338,7 +338,7 @@ public boolean isControllerRegistrationSupported() {
 
 public short fetchRequestVersion() {
 if (this.isAtLeast(IBP_3_5_IV1)) {
-return 15;
+return 16;

Review Comment:
   Hmm. Is this correct? In the upgrade scenario we will send request version 
16 to brokers that may not have that version yet. I know we just ignore tagged 
fields, but I'm not sure I recall if we can handle version bumps.
   
   If this is always just the latest version, should it be hardcoded?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KIP-951: Server side and protocol changes for KIP-951 [kafka]

2023-10-09 Thread via GitHub


jolshan commented on code in PR #1:
URL: https://github.com/apache/kafka/pull/1#discussion_r1350936703


##
core/src/main/scala/kafka/server/KafkaApis.scala:
##
@@ -559,6 +560,30 @@ class KafkaApis(val requestChannel: RequestChannel,
 }
   }
 
+  case class LeaderNode(leaderId: Int, leaderEpoch: Int, node: Node)
+
+  private def getCurrentLeader(tp: TopicPartition): LeaderNode = {
+val partitionInfoOrError = replicaManager.getPartitionOrError(tp)
+var leaderId = -1
+var leaderEpoch = -1
+partitionInfoOrError match {
+  case Right(x) =>
+  leaderId = x.leaderReplicaIdOpt.getOrElse(-1)
+  leaderEpoch = x.getLeaderEpoch
+  case Left(x) =>
+debug(s"Unable to retrieve local leaderId and Epoch with error $x, 
falling back to metadata cache")
+val partitionInfo = metadataCache.getPartitionInfo(tp.topic, 
tp.partition)
+partitionInfo.foreach { info =>

Review Comment:
   
   ```
   partitionInfo =  partitionInfoOrError match {
 case Right(partitionInfo) =>
partitionInfo
 case Left(error) =>
   debug(s"Unable to retrieve local leaderId and Epoch with error 
$error, falling back to metadata cache")
   metadataCache.getPartitionInfo(tp.topic, tp.partition) match {
 case Some(partitionInfo) => partitionInfo
 case None => handle case where we don't have the partition
   }
}
   ```
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KIP-951: Server side and protocol changes for KIP-951 [kafka]

2023-10-09 Thread via GitHub


jolshan commented on code in PR #1:
URL: https://github.com/apache/kafka/pull/1#discussion_r1350934417


##
core/src/main/scala/kafka/server/KafkaApis.scala:
##
@@ -559,6 +560,30 @@ class KafkaApis(val requestChannel: RequestChannel,
 }
   }
 
+  case class LeaderNode(leaderId: Int, leaderEpoch: Int, node: Node)
+
+  private def getCurrentLeader(tp: TopicPartition): LeaderNode = {
+val partitionInfoOrError = replicaManager.getPartitionOrError(tp)
+var leaderId = -1
+var leaderEpoch = -1
+partitionInfoOrError match {
+  case Right(x) =>
+  leaderId = x.leaderReplicaIdOpt.getOrElse(-1)
+  leaderEpoch = x.getLeaderEpoch
+  case Left(x) =>
+debug(s"Unable to retrieve local leaderId and Epoch with error $x, 
falling back to metadata cache")
+val partitionInfo = metadataCache.getPartitionInfo(tp.topic, 
tp.partition)
+partitionInfo.foreach { info =>

Review Comment:
   We also don't need to set vars here. We could have a statement where we 
return a tuple or even just the partitionInfo.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KIP-951: Server side and protocol changes for KIP-951 [kafka]

2023-10-09 Thread via GitHub


jolshan commented on code in PR #1:
URL: https://github.com/apache/kafka/pull/1#discussion_r1350934417


##
core/src/main/scala/kafka/server/KafkaApis.scala:
##
@@ -559,6 +560,30 @@ class KafkaApis(val requestChannel: RequestChannel,
 }
   }
 
+  case class LeaderNode(leaderId: Int, leaderEpoch: Int, node: Node)
+
+  private def getCurrentLeader(tp: TopicPartition): LeaderNode = {
+val partitionInfoOrError = replicaManager.getPartitionOrError(tp)
+var leaderId = -1
+var leaderEpoch = -1
+partitionInfoOrError match {
+  case Right(x) =>
+  leaderId = x.leaderReplicaIdOpt.getOrElse(-1)
+  leaderEpoch = x.getLeaderEpoch
+  case Left(x) =>
+debug(s"Unable to retrieve local leaderId and Epoch with error $x, 
falling back to metadata cache")
+val partitionInfo = metadataCache.getPartitionInfo(tp.topic, 
tp.partition)
+partitionInfo.foreach { info =>

Review Comment:
   We also don't need to set vars here. We could have a statement where we 
return a tuple.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KIP-951: Server side and protocol changes for KIP-951 [kafka]

2023-10-09 Thread via GitHub


jolshan commented on code in PR #1:
URL: https://github.com/apache/kafka/pull/1#discussion_r1350933806


##
core/src/main/scala/kafka/server/KafkaApis.scala:
##
@@ -559,6 +560,30 @@ class KafkaApis(val requestChannel: RequestChannel,
 }
   }
 
+  case class LeaderNode(leaderId: Int, leaderEpoch: Int, node: Node)
+
+  private def getCurrentLeader(tp: TopicPartition): LeaderNode = {
+val partitionInfoOrError = replicaManager.getPartitionOrError(tp)
+var leaderId = -1
+var leaderEpoch = -1
+partitionInfoOrError match {
+  case Right(x) =>
+  leaderId = x.leaderReplicaIdOpt.getOrElse(-1)
+  leaderEpoch = x.getLeaderEpoch
+  case Left(x) =>
+debug(s"Unable to retrieve local leaderId and Epoch with error $x, 
falling back to metadata cache")
+val partitionInfo = metadataCache.getPartitionInfo(tp.topic, 
tp.partition)
+partitionInfo.foreach { info =>

Review Comment:
   getPartitionInfo returns an option. If it exists, foreach will access it. If 
it doesn't foreach does nothing. This is a common pattern in scala. Are we 
considering the case when the partition is not present?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KIP-951: Server side and protocol changes for KIP-951 [kafka]

2023-10-09 Thread via GitHub


jolshan commented on code in PR #1:
URL: https://github.com/apache/kafka/pull/1#discussion_r1350932166


##
clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java:
##
@@ -67,20 +69,31 @@ public ProduceResponse(ProduceResponseData 
produceResponseData) {
  */
 @Deprecated
 public ProduceResponse(Map responses) {
-this(responses, DEFAULT_THROTTLE_TIME);
+this(responses, DEFAULT_THROTTLE_TIME, Collections.emptyList());
 }
 
 /**
- * Constructor for the latest version
+ * Constructor for versions <= 9
  * @param responses Produced data grouped by topic-partition
  * @param throttleTimeMs Time in milliseconds the response was throttled
  */
 @Deprecated
 public ProduceResponse(Map responses, 
int throttleTimeMs) {
-this(toData(responses, throttleTimeMs));
+this(toData(responses, throttleTimeMs, Collections.emptyList()));
+}
+
+/**
+ * Constructor for the latest version
+ * @param responses Produced data grouped by topic-partition
+ * @param throttleTimeMs Time in milliseconds the response was throttled
+ * @param nodeEndpoints List of node endpoints
+ */
+@Deprecated

Review Comment:
   KAFKA-10730 is a pretty dormant JIRA. I do agree that there is some level of 
conversion. I wonder if folks have a strong opinion about this conversion 
still. 
   
   Looking into this further, I see the change would need to be made to 
appendRecords and the ProducePartitionStatus. It doesn't look too crazy, but 
also understandable this is not the scope for this PR.
   
   I wonder if KAFKA-9682 was premature in deprecating the constructor. I guess 
our options are leaving it deprecated and adding a deprecated method or 
removing the deprecation until KAFKA-10730 is completed. (I almost just want to 
fix it so this doesn't happen in the future  )



##
clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java:
##
@@ -67,20 +69,31 @@ public ProduceResponse(ProduceResponseData 
produceResponseData) {
  */
 @Deprecated
 public ProduceResponse(Map responses) {
-this(responses, DEFAULT_THROTTLE_TIME);
+this(responses, DEFAULT_THROTTLE_TIME, Collections.emptyList());
 }
 
 /**
- * Constructor for the latest version
+ * Constructor for versions <= 9
  * @param responses Produced data grouped by topic-partition
  * @param throttleTimeMs Time in milliseconds the response was throttled
  */
 @Deprecated
 public ProduceResponse(Map responses, 
int throttleTimeMs) {
-this(toData(responses, throttleTimeMs));
+this(toData(responses, throttleTimeMs, Collections.emptyList()));
+}
+
+/**
+ * Constructor for the latest version
+ * @param responses Produced data grouped by topic-partition
+ * @param throttleTimeMs Time in milliseconds the response was throttled
+ * @param nodeEndpoints List of node endpoints
+ */
+@Deprecated

Review Comment:
   KAFKA-10730 is a pretty dormant JIRA. I do agree that there is some level of 
conversion. I wonder if folks have a strong opinion about this conversion 
still. 
   
   Looking into this further, I see the change would need to be made to 
appendRecords and the ProducePartitionStatus. It doesn't look too crazy, but 
also understandable this is not the scope for this PR.
   
   I wonder if KAFKA-9682 was premature in deprecating the constructor. I guess 
our options are leaving it deprecated and adding a deprecated method or 
removing the deprecation until KAFKA-10730 is completed. (I almost just want to 
fix it so this doesn't happen in the future  )



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KIP-951: Server side and protocol changes for KIP-951 [kafka]

2023-10-09 Thread via GitHub


jolshan commented on code in PR #1:
URL: https://github.com/apache/kafka/pull/1#discussion_r1350511364


##
clients/src/main/resources/common/message/ProduceResponse.json:
##
@@ -32,7 +32,9 @@
   // records that cause the whole batch to be dropped.  See KIP-467 for 
details.
   //
   // Version 9 enables flexible versions.
-  "validVersions": "0-9",
+  //
+  // Version 10 adds 'CurrentLeader' and 'NodeEndpoints' as tagged fields

Review Comment:
   Are we in agreement then that we should always bump the tagged field then? 
Unless we explicitly plan to backport the changes (ie what Jeff did for the 
consumer group changes?)
   
   I had some folks ask me what the correct protocol is, and it would be nice 
to give consistent answers  



##
clients/src/main/resources/common/message/ProduceResponse.json:
##
@@ -32,7 +32,9 @@
   // records that cause the whole batch to be dropped.  See KIP-467 for 
details.
   //
   // Version 9 enables flexible versions.
-  "validVersions": "0-9",
+  //
+  // Version 10 adds 'CurrentLeader' and 'NodeEndpoints' as tagged fields

Review Comment:
   Are we in agreement then that we should always bump the tagged field then? 
Unless we explicitly plan to backport the changes (ie what Jeff did for the 
consumer group changes?)
   
   I had some folks ask me what the correct protocol is, and it would be nice 
to give consistent answers  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KIP-951: Server side and protocol changes for KIP-951 [kafka]

2023-10-06 Thread via GitHub


hachikuji commented on code in PR #1:
URL: https://github.com/apache/kafka/pull/1#discussion_r1349226317


##
clients/src/main/resources/common/message/ProduceResponse.json:
##
@@ -32,7 +32,9 @@
   // records that cause the whole batch to be dropped.  See KIP-467 for 
details.
   //
   // Version 9 enables flexible versions.
-  "validVersions": "0-9",
+  //
+  // Version 10 adds 'CurrentLeader' and 'NodeEndpoints' as tagged fields

Review Comment:
   The tricky thing is that bumping the protocol version does not imply client 
support (the client can ignore tagged fields). But it does help for the 
standard client since we know the feature will be supported. That probably does 
make request analysis easier. So I guess I'm appeased.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KIP-951: Server side and protocol changes for KIP-951 [kafka]

2023-10-06 Thread via GitHub


chb2ab commented on code in PR #1:
URL: https://github.com/apache/kafka/pull/1#discussion_r1349137249


##
clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java:
##
@@ -67,20 +69,31 @@ public ProduceResponse(ProduceResponseData 
produceResponseData) {
  */
 @Deprecated
 public ProduceResponse(Map responses) {
-this(responses, DEFAULT_THROTTLE_TIME);
+this(responses, DEFAULT_THROTTLE_TIME, Collections.emptyList());
 }
 
 /**
- * Constructor for the latest version
+ * Constructor for versions <= 9
  * @param responses Produced data grouped by topic-partition
  * @param throttleTimeMs Time in milliseconds the response was throttled
  */
 @Deprecated
 public ProduceResponse(Map responses, 
int throttleTimeMs) {
-this(toData(responses, throttleTimeMs));
+this(toData(responses, throttleTimeMs, Collections.emptyList()));
+}
+
+/**
+ * Constructor for the latest version
+ * @param responses Produced data grouped by topic-partition
+ * @param throttleTimeMs Time in milliseconds the response was throttled
+ * @param nodeEndpoints List of node endpoints
+ */
+@Deprecated

Review Comment:
   I started implementing this but I do think it's getting out of scope of this 
PR. 
[These](https://github.com/chb2ab/kafka/commit/e7bbd7b463db115e2f7b14618ea14da0ee893ae4)
 are my initial changes, to finish I think we want to remove PartitionResponse 
completely and replace it with PartitionProduceResponse, otherwise were just 
moving around the conversion. Having a deprecated constructor isn't ideal but I 
think we should remove it with KAFKA-10730, not this. @jolshan what do you 
think?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KIP-951: Server side and protocol changes for KIP-951 [kafka]

2023-10-06 Thread via GitHub


chb2ab commented on code in PR #1:
URL: https://github.com/apache/kafka/pull/1#discussion_r1349137249


##
clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java:
##
@@ -67,20 +69,31 @@ public ProduceResponse(ProduceResponseData 
produceResponseData) {
  */
 @Deprecated
 public ProduceResponse(Map responses) {
-this(responses, DEFAULT_THROTTLE_TIME);
+this(responses, DEFAULT_THROTTLE_TIME, Collections.emptyList());
 }
 
 /**
- * Constructor for the latest version
+ * Constructor for versions <= 9
  * @param responses Produced data grouped by topic-partition
  * @param throttleTimeMs Time in milliseconds the response was throttled
  */
 @Deprecated
 public ProduceResponse(Map responses, 
int throttleTimeMs) {
-this(toData(responses, throttleTimeMs));
+this(toData(responses, throttleTimeMs, Collections.emptyList()));
+}
+
+/**
+ * Constructor for the latest version
+ * @param responses Produced data grouped by topic-partition
+ * @param throttleTimeMs Time in milliseconds the response was throttled
+ * @param nodeEndpoints List of node endpoints
+ */
+@Deprecated

Review Comment:
   I started implementing this but I do think it's getting out of scope of this 
PR. 
[These](https://github.com/chb2ab/kafka/commit/e7bbd7b463db115e2f7b14618ea14da0ee893ae4)
 are my initial changes, to finish I think we want to remove PartitionResponse 
completely and replace it with PartitionProduceResponse. Having a deprecated 
constructor isn't ideal but I think we should remove it with KAFKA-10730, not 
this. @jolshan what do you think?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KIP-951: Server side and protocol changes for KIP-951 [kafka]

2023-10-06 Thread via GitHub


chb2ab commented on code in PR #1:
URL: https://github.com/apache/kafka/pull/1#discussion_r1349136486


##
clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java:
##
@@ -276,11 +289,16 @@ private static FetchResponseData toMessage(Errors error,
 .setPartitions(partitionResponses));
 }
 }
-
-return new FetchResponseData()
-.setThrottleTimeMs(throttleTimeMs)
-.setErrorCode(error.code())
-.setSessionId(sessionId)
-.setResponses(topicResponseList);
+data.setThrottleTimeMs(throttleTimeMs)
+.setErrorCode(error.code())
+.setSessionId(sessionId)
+.setResponses(topicResponseList);
+nodeEndpoints.forEach(endpoint -> data.nodeEndpoints().add(

Review Comment:
   reordered to make things cleaner



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KIP-951: Server side and protocol changes for KIP-951 [kafka]

2023-10-06 Thread via GitHub


dajac commented on code in PR #1:
URL: https://github.com/apache/kafka/pull/1#discussion_r1348494902


##
clients/src/main/resources/common/message/ProduceResponse.json:
##
@@ -32,7 +32,9 @@
   // records that cause the whole batch to be dropped.  See KIP-467 for 
details.
   //
   // Version 9 enables flexible versions.
-  "validVersions": "0-9",
+  //
+  // Version 10 adds 'CurrentLeader' and 'NodeEndpoints' as tagged fields

Review Comment:
   I actually pushed for this. I wanted to know when a client supports this new 
mechanism or not. Without a version bump, it is hard to tell.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KIP-951: Server side and protocol changes for KIP-951 [kafka]

2023-10-05 Thread via GitHub


jolshan commented on code in PR #1:
URL: https://github.com/apache/kafka/pull/1#discussion_r1348136534


##
clients/src/main/resources/common/message/ProduceResponse.json:
##
@@ -32,7 +32,9 @@
   // records that cause the whole batch to be dropped.  See KIP-467 for 
details.
   //
   // Version 9 enables flexible versions.
-  "validVersions": "0-9",
+  //
+  // Version 10 adds 'CurrentLeader' and 'NodeEndpoints' as tagged fields

Review Comment:
   I was also wondering this and found the same discussion on the mailing 
thread. Ismael also +1'd this comment.
   
   I think generally we should have a strategy with respect to tagged fields. 
Should we always use the lowest tagged version? Should we only do that if older 
clients can use it/we backport changes for it?
   
   If we plan to bump on tagged fields is the only benefit that we save some 
bites over the wire in the case we don't always include the value in the 
response? (I'm also a bit unsure about the "saving bytes" here since we would 
just have an empty node endpoint array)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KIP-951: Server side and protocol changes for KIP-951 [kafka]

2023-10-05 Thread via GitHub


jolshan commented on code in PR #1:
URL: https://github.com/apache/kafka/pull/1#discussion_r1348134556


##
clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java:
##
@@ -67,20 +69,31 @@ public ProduceResponse(ProduceResponseData 
produceResponseData) {
  */
 @Deprecated
 public ProduceResponse(Map responses) {
-this(responses, DEFAULT_THROTTLE_TIME);
+this(responses, DEFAULT_THROTTLE_TIME, Collections.emptyList());
 }
 
 /**
- * Constructor for the latest version
+ * Constructor for versions <= 9
  * @param responses Produced data grouped by topic-partition
  * @param throttleTimeMs Time in milliseconds the response was throttled
  */
 @Deprecated
 public ProduceResponse(Map responses, 
int throttleTimeMs) {
-this(toData(responses, throttleTimeMs));
+this(toData(responses, throttleTimeMs, Collections.emptyList()));
+}
+
+/**
+ * Constructor for the latest version
+ * @param responses Produced data grouped by topic-partition
+ * @param throttleTimeMs Time in milliseconds the response was throttled
+ * @param nodeEndpoints List of node endpoints
+ */
+@Deprecated

Review Comment:
   Taking a closer look, it looks like there was an effort to build the 
response directly and not pass in data structures (new maps) that will just be 
converted via toData.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KIP-951: Server side and protocol changes for KIP-951 [kafka]

2023-10-05 Thread via GitHub


jolshan commented on code in PR #1:
URL: https://github.com/apache/kafka/pull/1#discussion_r1348129728


##
clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java:
##
@@ -67,20 +69,31 @@ public ProduceResponse(ProduceResponseData 
produceResponseData) {
  */
 @Deprecated
 public ProduceResponse(Map responses) {
-this(responses, DEFAULT_THROTTLE_TIME);
+this(responses, DEFAULT_THROTTLE_TIME, Collections.emptyList());
 }
 
 /**
- * Constructor for the latest version
+ * Constructor for versions <= 9
  * @param responses Produced data grouped by topic-partition
  * @param throttleTimeMs Time in milliseconds the response was throttled
  */
 @Deprecated
 public ProduceResponse(Map responses, 
int throttleTimeMs) {
-this(toData(responses, throttleTimeMs));
+this(toData(responses, throttleTimeMs, Collections.emptyList()));
+}
+
+/**
+ * Constructor for the latest version
+ * @param responses Produced data grouped by topic-partition
+ * @param throttleTimeMs Time in milliseconds the response was throttled
+ * @param nodeEndpoints List of node endpoints
+ */
+@Deprecated

Review Comment:
   I think all we would need to do is create the produceResponseData object. I 
don't think it would be too much work. I'm a little wary of adding more 
deprecated constructors.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KIP-951: Server side and protocol changes for KIP-951 [kafka]

2023-10-05 Thread via GitHub


jolshan commented on code in PR #1:
URL: https://github.com/apache/kafka/pull/1#discussion_r1348126229


##
clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java:
##
@@ -65,24 +67,41 @@ public ProduceResponse(ProduceResponseData 
produceResponseData) {
 
 /**
  * Constructor for Version 0
+ * This is deprecated in favor of using the ProduceResponseData 
constructor, KafkaApis should switch to that
+ * in KAFKA-10730
  * @param responses Produced data grouped by topic-partition
  */
 @Deprecated
 public ProduceResponse(Map responses) {
-this(responses, DEFAULT_THROTTLE_TIME);
+this(responses, DEFAULT_THROTTLE_TIME, Collections.emptyList());
 }
 
 /**
- * Constructor for the latest version
+ * Constructor for versions <= 9
+ * This is deprecated in favor of using the ProduceResponseData 
constructor, KafkaApis should switch to that
+ * in KAFKA-10730

Review Comment:
   I don't think this comment is correct. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KIP-951: Server side and protocol changes for KIP-951 [kafka]

2023-10-05 Thread via GitHub


jolshan commented on code in PR #1:
URL: https://github.com/apache/kafka/pull/1#discussion_r1348126229


##
clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java:
##
@@ -65,24 +67,41 @@ public ProduceResponse(ProduceResponseData 
produceResponseData) {
 
 /**
  * Constructor for Version 0
+ * This is deprecated in favor of using the ProduceResponseData 
constructor, KafkaApis should switch to that
+ * in KAFKA-10730
  * @param responses Produced data grouped by topic-partition
  */
 @Deprecated
 public ProduceResponse(Map responses) {
-this(responses, DEFAULT_THROTTLE_TIME);
+this(responses, DEFAULT_THROTTLE_TIME, Collections.emptyList());
 }
 
 /**
- * Constructor for the latest version
+ * Constructor for versions <= 9
+ * This is deprecated in favor of using the ProduceResponseData 
constructor, KafkaApis should switch to that
+ * in KAFKA-10730

Review Comment:
   I don't think this comment is correct. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KIP-951: Server side and protocol changes for KIP-951 [kafka]

2023-10-05 Thread via GitHub


jolshan commented on code in PR #1:
URL: https://github.com/apache/kafka/pull/1#discussion_r1348125403


##
clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java:
##
@@ -276,11 +289,16 @@ private static FetchResponseData toMessage(Errors error,
 .setPartitions(partitionResponses));
 }
 }
-
-return new FetchResponseData()
-.setThrottleTimeMs(throttleTimeMs)
-.setErrorCode(error.code())
-.setSessionId(sessionId)
-.setResponses(topicResponseList);
+data.setThrottleTimeMs(throttleTimeMs)
+.setErrorCode(error.code())
+.setSessionId(sessionId)
+.setResponses(topicResponseList);
+nodeEndpoints.forEach(endpoint -> data.nodeEndpoints().add(

Review Comment:
   Could we also create the nodeEndpoints together so we keep the final data 
object relatively clean. (Ie we generate the response data structure above, we 
could also generate the enpoints data structure above.) 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KIP-951: Server side and protocol changes for KIP-951 [kafka]

2023-10-05 Thread via GitHub


jolshan commented on code in PR #1:
URL: https://github.com/apache/kafka/pull/1#discussion_r1348118773


##
clients/src/main/resources/common/message/FetchRequest.json:
##
@@ -53,7 +53,9 @@
   //
   // Version 15 adds the ReplicaState which includes new field ReplicaEpoch 
and the ReplicaId. Also,
   // deprecate the old ReplicaId field and set its default value to -1. 
(KIP-903)
-  "validVersions": "0-15",
+  //
+  // Version 16 is the same as version 15.

Review Comment:
   Request and response version should always be the same :) 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KIP-951: Server side and protocol changes for KIP-951 [kafka]

2023-10-05 Thread via GitHub


chb2ab commented on code in PR #1:
URL: https://github.com/apache/kafka/pull/1#discussion_r1347868792


##
clients/src/main/resources/common/message/ProduceResponse.json:
##
@@ -32,7 +32,9 @@
   // records that cause the whole batch to be dropped.  See KIP-467 for 
details.
   //
   // Version 9 enables flexible versions.
-  "validVersions": "0-9",
+  //
+  // Version 10 adds 'CurrentLeader' and 'NodeEndpoints' as tagged fields

Review Comment:
   I didn't realize this was brought up in previous discussions, it looks like 
we decided to bump up the version # to make it clearer which clients have 
implemented the feature, in an email from @dajac 
   
   > Personally, I would rather prefer to bump both versions and to add the
   tagged fields. This would allow us to better reason about what the client
   is supposed to do when we see the version on the server side. Otherwise, we
   will never know if the client uses this or not.
   
   @hachikuji does this sound good?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KIP-951: Server side and protocol changes for KIP-951 [kafka]

2023-10-05 Thread via GitHub


chb2ab commented on code in PR #1:
URL: https://github.com/apache/kafka/pull/1#discussion_r1347787189


##
clients/src/main/resources/common/message/ProduceResponse.json:
##
@@ -32,7 +32,9 @@
   // records that cause the whole batch to be dropped.  See KIP-467 for 
details.
   //
   // Version 9 enables flexible versions.
-  "validVersions": "0-9",
+  //
+  // Version 10 adds 'CurrentLeader' and 'NodeEndpoints' as tagged fields

Review Comment:
   ok, I'm fine with keeping the version the same, but we should update the 
KIP. @msn-tldr do you see any issues with this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KIP-951: Server side and protocol changes for KIP-951 [kafka]

2023-10-05 Thread via GitHub


hachikuji commented on code in PR #1:
URL: https://github.com/apache/kafka/pull/1#discussion_r1347783310


##
clients/src/main/resources/common/message/ProduceResponse.json:
##
@@ -32,7 +32,9 @@
   // records that cause the whole batch to be dropped.  See KIP-467 for 
details.
   //
   // Version 9 enables flexible versions.
-  "validVersions": "0-9",
+  //
+  // Version 10 adds 'CurrentLeader' and 'NodeEndpoints' as tagged fields

Review Comment:
   New tagged fields would be unknown to older clients, so they would ignore 
them. It would not affect their ability to deserialize.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KIP-951: Server side and protocol changes for KIP-951 [kafka]

2023-10-05 Thread via GitHub


chb2ab commented on code in PR #1:
URL: https://github.com/apache/kafka/pull/1#discussion_r1347762660


##
clients/src/main/resources/common/message/ProduceResponse.json:
##
@@ -32,7 +32,9 @@
   // records that cause the whole batch to be dropped.  See KIP-467 for 
details.
   //
   // Version 9 enables flexible versions.
-  "validVersions": "0-9",
+  //
+  // Version 10 adds 'CurrentLeader' and 'NodeEndpoints' as tagged fields

Review Comment:
   I'm not sure if this is absolutely necessary, I was going based off the KIP, 
but I do think there could be an issue with leaving the version the same. If a 
client is still using the old protocol definition and the server returns a 
message based on the new definition but with the same version number, wouldn't 
the client deserialize it incorrectly?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KIP-951: Server side and protocol changes for KIP-951 [kafka]

2023-10-05 Thread via GitHub


chb2ab commented on code in PR #1:
URL: https://github.com/apache/kafka/pull/1#discussion_r1347762660


##
clients/src/main/resources/common/message/ProduceResponse.json:
##
@@ -32,7 +32,9 @@
   // records that cause the whole batch to be dropped.  See KIP-467 for 
details.
   //
   // Version 9 enables flexible versions.
-  "validVersions": "0-9",
+  //
+  // Version 10 adds 'CurrentLeader' and 'NodeEndpoints' as tagged fields

Review Comment:
   I'm not sure if this is absolutely necessary, I was going based off the KIP, 
but I do think there could be an issue with leaving the version the same. If a 
client is still using the old protocol definition and the server returns a 
message based on the new definition but with the same version number, I think 
the client would deserialize it incorrectly?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KIP-951: Server side and protocol changes for KIP-951 [kafka]

2023-10-05 Thread via GitHub


hachikuji commented on code in PR #1:
URL: https://github.com/apache/kafka/pull/1#discussion_r1347719979


##
clients/src/main/resources/common/message/ProduceResponse.json:
##
@@ -32,7 +32,9 @@
   // records that cause the whole batch to be dropped.  See KIP-467 for 
details.
   //
   // Version 9 enables flexible versions.
-  "validVersions": "0-9",
+  //
+  // Version 10 adds 'CurrentLeader' and 'NodeEndpoints' as tagged fields

Review Comment:
   Why do we need to bump the version if we are just adding tagged fields?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KIP-951: Server side and protocol changes for KIP-951 [kafka]

2023-10-04 Thread via GitHub


msn-tldr commented on code in PR #1:
URL: https://github.com/apache/kafka/pull/1#discussion_r1346257999


##
clients/src/main/resources/common/message/FetchRequest.json:
##
@@ -53,7 +53,9 @@
   //
   // Version 15 adds the ReplicaState which includes new field ReplicaEpoch 
and the ReplicaId. Also,
   // deprecate the old ReplicaId field and set its default value to -1. 
(KIP-903)
-  "validVersions": "0-15",
+  //
+  // Version 16 is the same as version 15.

Review Comment:
   @chb2ab that's correct.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KIP-951: Server side and protocol changes for KIP-951 [kafka]

2023-10-04 Thread via GitHub


chb2ab commented on code in PR #1:
URL: https://github.com/apache/kafka/pull/1#discussion_r1346244195


##
clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java:
##
@@ -276,11 +289,16 @@ private static FetchResponseData toMessage(Errors error,
 .setPartitions(partitionResponses));
 }
 }
-
-return new FetchResponseData()
-.setThrottleTimeMs(throttleTimeMs)
-.setErrorCode(error.code())
-.setSessionId(sessionId)
-.setResponses(topicResponseList);
+data.setThrottleTimeMs(throttleTimeMs)
+.setErrorCode(error.code())
+.setSessionId(sessionId)
+.setResponses(topicResponseList);
+nodeEndpoints.forEach(endpoint -> data.nodeEndpoints().add(

Review Comment:
   added



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KIP-951: Server side and protocol changes for KIP-951 [kafka]

2023-10-02 Thread via GitHub


yangy commented on code in PR #1:
URL: https://github.com/apache/kafka/pull/1#discussion_r1343485049


##
core/src/main/scala/kafka/server/KafkaApis.scala:
##
@@ -559,6 +560,30 @@ class KafkaApis(val requestChannel: RequestChannel,
 }
   }
 
+  case class LeaderNode(leaderId: Int, leaderEpoch: Int, node: Node)
+
+  private def getCurrentLeader(tp: TopicPartition): LeaderNode = {
+val partitionInfoOrError = replicaManager.getPartitionOrError(tp)
+var leaderId = -1
+var leaderEpoch = -1
+partitionInfoOrError match {
+  case Right(x) =>
+  leaderId = x.leaderReplicaIdOpt.getOrElse(-1)
+  leaderEpoch = x.getLeaderEpoch
+  case Left(x) =>
+debug(s"Unable to retrieve local leaderId and Epoch with error $x, 
falling back to metadata cache")
+val partitionInfo = metadataCache.getPartitionInfo(tp.topic, 
tp.partition)

Review Comment:
   I see



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KIP-951: Server side and protocol changes for KIP-951 [kafka]

2023-10-02 Thread via GitHub


chb2ab commented on code in PR #1:
URL: https://github.com/apache/kafka/pull/1#discussion_r1343033045


##
clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java:
##
@@ -210,6 +238,12 @@ public String toString() {
 b.append(logStartOffset);
 b.append(", recordErrors: ");
 b.append(recordErrors);
+b.append(", currentLeader: ");
+if (currentLeader != null) {

Review Comment:
   looking at the java docs I think you're right, this could be replaced by 
`b.append(currentLeader)`. I'm not sure why `errorMessage` was written this 
way, it looks like it was changed explicitly in [this 
commit](https://github.com/apache/kafka/commit/f41a5c2c8632bfd0dc50321c1c69418db04f42f6#diff-82aef2b279f7d0093b5e7bbd34cbf9abfa6bb5ed454c72419c03dbe2e58e0eab)
 but I don't see a reason for it, I could probably change this as well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KIP-951: Server side and protocol changes for KIP-951 [kafka]

2023-10-02 Thread via GitHub


chb2ab commented on code in PR #1:
URL: https://github.com/apache/kafka/pull/1#discussion_r1342844776


##
core/src/main/scala/kafka/server/KafkaApis.scala:
##
@@ -559,6 +560,30 @@ class KafkaApis(val requestChannel: RequestChannel,
 }
   }
 
+  case class LeaderNode(leaderId: Int, leaderEpoch: Int, node: Node)
+
+  private def getCurrentLeader(tp: TopicPartition): LeaderNode = {
+val partitionInfoOrError = replicaManager.getPartitionOrError(tp)
+var leaderId = -1
+var leaderEpoch = -1
+partitionInfoOrError match {
+  case Right(x) =>
+  leaderId = x.leaderReplicaIdOpt.getOrElse(-1)
+  leaderEpoch = x.getLeaderEpoch
+  case Left(x) =>
+debug(s"Unable to retrieve local leaderId and Epoch with error $x, 
falling back to metadata cache")
+val partitionInfo = metadataCache.getPartitionInfo(tp.topic, 
tp.partition)

Review Comment:
   I don't think so, getPartitionInfo returns an Option, the equivalent of null 
would be an empty option. We don't seem to null check this value elsewhere 
either.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KIP-951: Server side and protocol changes for KIP-951 [kafka]

2023-10-02 Thread via GitHub


chb2ab commented on code in PR #1:
URL: https://github.com/apache/kafka/pull/1#discussion_r1342850556


##
core/src/main/scala/kafka/server/KafkaApis.scala:
##
@@ -559,6 +560,30 @@ class KafkaApis(val requestChannel: RequestChannel,
 }
   }
 
+  case class LeaderNode(leaderId: Int, leaderEpoch: Int, node: Node)
+
+  private def getCurrentLeader(tp: TopicPartition): LeaderNode = {
+val partitionInfoOrError = replicaManager.getPartitionOrError(tp)
+var leaderId = -1
+var leaderEpoch = -1
+partitionInfoOrError match {
+  case Right(x) =>
+  leaderId = x.leaderReplicaIdOpt.getOrElse(-1)
+  leaderEpoch = x.getLeaderEpoch
+  case Left(x) =>
+debug(s"Unable to retrieve local leaderId and Epoch with error $x, 
falling back to metadata cache")
+val partitionInfo = metadataCache.getPartitionInfo(tp.topic, 
tp.partition)
+partitionInfo.foreach { info =>

Review Comment:
   Looking at other uses of partitionInfo I think this is a style choice. There 
can only be 1 partitionInfo in the getPartitionInfo object, so the forEach 
should only ever access 1 entry, I think this is just a more succinct way of 
accessing it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KIP-951: Server side and protocol changes for KIP-951 [kafka]

2023-10-02 Thread via GitHub


chb2ab commented on code in PR #1:
URL: https://github.com/apache/kafka/pull/1#discussion_r1342844776


##
core/src/main/scala/kafka/server/KafkaApis.scala:
##
@@ -559,6 +560,30 @@ class KafkaApis(val requestChannel: RequestChannel,
 }
   }
 
+  case class LeaderNode(leaderId: Int, leaderEpoch: Int, node: Node)
+
+  private def getCurrentLeader(tp: TopicPartition): LeaderNode = {
+val partitionInfoOrError = replicaManager.getPartitionOrError(tp)
+var leaderId = -1
+var leaderEpoch = -1
+partitionInfoOrError match {
+  case Right(x) =>
+  leaderId = x.leaderReplicaIdOpt.getOrElse(-1)
+  leaderEpoch = x.getLeaderEpoch
+  case Left(x) =>
+debug(s"Unable to retrieve local leaderId and Epoch with error $x, 
falling back to metadata cache")
+val partitionInfo = metadataCache.getPartitionInfo(tp.topic, 
tp.partition)

Review Comment:
   I don't think so, getPartitionInfo returns an Option, the equivalent of null 
would be an empty option.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org