[GitHub] [kafka] hachikuji commented on a change in pull request #9100: Add AlterISR RPC and use it for ISR modifications

2020-09-25 Thread GitBox


hachikuji commented on a change in pull request #9100:
URL: https://github.com/apache/kafka/pull/9100#discussion_r494480847



##
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##
@@ -1764,6 +1769,145 @@ class KafkaController(val config: KafkaConfig,
 }
   }
 
+  def alterIsrs(alterIsrRequest: AlterIsrRequestData, callback: 
AlterIsrResponseData => Unit): Unit = {
+val isrsToAlter = mutable.Map[TopicPartition, LeaderAndIsr]()
+
+alterIsrRequest.topics.forEach { topicReq =>
+  topicReq.partitions.forEach { partitionReq =>
+val tp = new TopicPartition(topicReq.name, partitionReq.partitionIndex)
+val newIsr = partitionReq.newIsr().asScala.toList.map(_.toInt)
+isrsToAlter.put(tp, new LeaderAndIsr(alterIsrRequest.brokerId, 
partitionReq.leaderEpoch, newIsr, partitionReq.currentIsrVersion))
+  }
+}
+
+def responseCallback(results: Either[Map[TopicPartition, Either[Errors, 
LeaderAndIsr]], Errors]): Unit = {
+  val resp = new AlterIsrResponseData()
+  results match {
+case Right(error) =>
+  resp.setErrorCode(error.code)
+case Left(partitionResults) =>
+  resp.setTopics(new util.ArrayList())
+  partitionResults
+.groupBy { case (tp, _) => tp.topic }   // Group by topic
+.foreach { case (topic, partitions) =>
+  // Add each topic part to the response
+  val topicResp = new AlterIsrResponseData.TopicData()
+.setName(topic)
+.setPartitions(new util.ArrayList())
+  resp.topics.add(topicResp)
+  partitions.foreach { case (tp, errorOrIsr) =>
+// Add each partition part to the response (new ISR or error)
+errorOrIsr match {
+  case Left(error) => topicResp.partitions.add(
+new AlterIsrResponseData.PartitionData()
+  .setPartitionIndex(tp.partition)
+  .setErrorCode(error.code))
+  case Right(leaderAndIsr) => topicResp.partitions.add(
+new AlterIsrResponseData.PartitionData()
+  .setPartitionIndex(tp.partition)
+  .setLeaderId(leaderAndIsr.leader)
+  .setLeaderEpoch(leaderAndIsr.leaderEpoch)
+  .setIsr(leaderAndIsr.isr.map(Integer.valueOf).asJava)
+  .setCurrentIsrVersion(leaderAndIsr.zkVersion))
+}
+}
+  }
+  }
+  callback.apply(resp)
+}
+
+eventManager.put(AlterIsrReceived(alterIsrRequest.brokerId, 
alterIsrRequest.brokerEpoch, isrsToAlter, responseCallback))
+  }
+
+  private def processAlterIsr(brokerId: Int, brokerEpoch: Long, isrsToAlter: 
Map[TopicPartition, LeaderAndIsr],
+  callback: AlterIsrCallback): Unit = {
+
+// Handle a few short-circuits
+if (!isActive) {
+  callback.apply(Right(Errors.NOT_CONTROLLER))
+  return
+}
+
+val brokerEpochOpt = controllerContext.liveBrokerIdAndEpochs.get(brokerId)
+if (brokerEpochOpt.isEmpty) {
+  info(s"Ignoring AlterIsr due to unknown broker $brokerId")
+  callback.apply(Right(Errors.STALE_BROKER_EPOCH))
+  return
+}
+
+if (!brokerEpochOpt.contains(brokerEpoch)) {
+  info(s"Ignoring AlterIsr due to stale broker epoch $brokerEpoch for 
broker $brokerId")
+  callback.apply(Right(Errors.STALE_BROKER_EPOCH))
+  return
+}
+
+val response = try {
+  val partitionResponses = mutable.HashMap[TopicPartition, Either[Errors, 
LeaderAndIsr]]()
+
+  // Determine which partitions we will accept the new ISR for
+  val adjustedIsrs: Map[TopicPartition, LeaderAndIsr] = 
isrsToAlter.flatMap {
+case (tp: TopicPartition, newLeaderAndIsr: LeaderAndIsr) =>
+  val partitionError: Errors = 
controllerContext.partitionLeadershipInfo(tp) match {
+case Some(leaderIsrAndControllerEpoch) =>
+  val currentLeaderAndIsr = 
leaderIsrAndControllerEpoch.leaderAndIsr
+  if (newLeaderAndIsr.leaderEpoch < 
currentLeaderAndIsr.leaderEpoch) {

Review comment:
   Consider the following scenario:
   
   1) broker sends AlterIsr
   2) the update succeeds but the response is lost
   3) broker retries AlterIsr
   
   Currently the leader will be stuck after 3) because it has no way to get the 
latest LeaderAndIsr state if the first attempt fails. To handle this, I think 
we need to add an idempotence check here. After we have validated the leader 
epoch, if the intended state matches the current state, then we can just return 
the current state. 

##
File path: core/src/main/scala/kafka/cluster/Partition.scala
##
@@ -1246,6 +1351,51 @@ class Partition(val topicPartition: TopicPartition,
 }
   }
 
+  private def sendAlterIsrRequest(proposedIsrState: IsrState): Boolean = {
+val isrToSendOpt: 

[GitHub] [kafka] hachikuji commented on a change in pull request #9100: Add AlterISR RPC and use it for ISR modifications

2020-09-24 Thread GitBox


hachikuji commented on a change in pull request #9100:
URL: https://github.com/apache/kafka/pull/9100#discussion_r494660983



##
File path: core/src/main/scala/kafka/cluster/Partition.scala
##
@@ -1366,6 +1366,12 @@ class Partition(val topicPartition: TopicPartition,
 }
   }
 
+  /**
+   * This is called for each partition in the body of an AlterIsr response. 
For errors which are non-retryable we simply

Review comment:
   nit: conventionally we prefer "retriable"

##
File path: core/src/main/scala/kafka/cluster/Partition.scala
##
@@ -1375,11 +1381,14 @@ class Partition(val topicPartition: TopicPartition,
   case Errors.FENCED_LEADER_EPOCH =>
 debug(s"Controller failed to update ISR to 
${proposedIsr.mkString(",")} since we sent an old leader epoch. Giving up.")
   case Errors.INVALID_UPDATE_VERSION =>
-debug(s"Controller failed to update ISR to 
${proposedIsr.mkString(",")} due to invalid zk version. Retrying.")
-sendAlterIsrRequest(isrState)
+debug(s"Controller failed to update ISR to 
${proposedIsr.mkString(",")} due to invalid zk version. Giving up.")
   case _ =>
-warn(s"Controller failed to update ISR to 
${proposedIsr.mkString(",")} due to $error. Retrying.")
-sendAlterIsrRequest(isrState)
+if (isrState.isInflight) {
+  warn(s"Controller failed to update ISR to 
${proposedIsr.mkString(",")} due to $error. Retrying.")
+  sendAlterIsrRequest(isrState)
+} else {
+  warn(s"Ignoring failed ISR update to 
${proposedIsr.mkString(",")} since due to $error since we have a committed 
ISR.")

Review comment:
   nit (for follow-up): fix grammar "since due"

##
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##
@@ -1847,21 +1847,22 @@ class KafkaController(val config: KafkaConfig,
   // Determine which partitions we will accept the new ISR for
   val adjustedIsrs: Map[TopicPartition, LeaderAndIsr] = 
isrsToAlter.flatMap {
 case (tp: TopicPartition, newLeaderAndIsr: LeaderAndIsr) =>
-  val partitionError: Errors = 
controllerContext.partitionLeadershipInfo(tp) match {
+  controllerContext.partitionLeadershipInfo(tp) match {
 case Some(leaderIsrAndControllerEpoch) =>
   val currentLeaderAndIsr = 
leaderIsrAndControllerEpoch.leaderAndIsr
   if (newLeaderAndIsr.leaderEpoch < 
currentLeaderAndIsr.leaderEpoch) {
-Errors.FENCED_LEADER_EPOCH
+partitionResponses(tp) = Left(Errors.FENCED_LEADER_EPOCH)
+None
+  } else if (newLeaderAndIsr.equalsIgnoreZk(currentLeaderAndIsr)) {
+// If a partition is already in the desired state, just return 
it

Review comment:
   It might be worth mentioning that this could happen in the case of a 
retry after a successful update.

##
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##
@@ -3066,20 +3066,18 @@ class KafkaApis(val requestChannel: RequestChannel,
   def handleAlterIsrRequest(request: RequestChannel.Request): Unit = {
 val alterIsrRequest = request.body[AlterIsrRequest]
 
-if (authorize(request.context, CLUSTER_ACTION, CLUSTER, CLUSTER_NAME)) {
-  if (!controller.isActive) {
-sendResponseMaybeThrottle(request, requestThrottleMs =>
-  alterIsrRequest.getErrorResponse(requestThrottleMs, 
Errors.NOT_CONTROLLER.exception()))
-  } else {
-controller.alterIsrs(alterIsrRequest.data,
-  alterIsrResp => sendResponseMaybeThrottle(request, requestThrottleMs 
=>
-new 
AlterIsrResponse(alterIsrResp.setThrottleTimeMs(requestThrottleMs))
-  )
-)
-  }
-} else {
+if (!authorize(request.context, CLUSTER_ACTION, CLUSTER, CLUSTER_NAME)) {
   sendResponseMaybeThrottle(request, requestThrottleMs =>
 alterIsrRequest.getErrorResponse(requestThrottleMs, 
Errors.CLUSTER_AUTHORIZATION_FAILED.exception))
+} else if (!controller.isActive) {
+  sendResponseMaybeThrottle(request, requestThrottleMs =>
+alterIsrRequest.getErrorResponse(requestThrottleMs, 
Errors.NOT_CONTROLLER.exception()))

Review comment:
   nit: leave off parenthesis after `exception`





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #9100: Add AlterISR RPC and use it for ISR modifications

2020-09-24 Thread GitBox


hachikuji commented on a change in pull request #9100:
URL: https://github.com/apache/kafka/pull/9100#discussion_r494480847



##
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##
@@ -1764,6 +1769,145 @@ class KafkaController(val config: KafkaConfig,
 }
   }
 
+  def alterIsrs(alterIsrRequest: AlterIsrRequestData, callback: 
AlterIsrResponseData => Unit): Unit = {
+val isrsToAlter = mutable.Map[TopicPartition, LeaderAndIsr]()
+
+alterIsrRequest.topics.forEach { topicReq =>
+  topicReq.partitions.forEach { partitionReq =>
+val tp = new TopicPartition(topicReq.name, partitionReq.partitionIndex)
+val newIsr = partitionReq.newIsr().asScala.toList.map(_.toInt)
+isrsToAlter.put(tp, new LeaderAndIsr(alterIsrRequest.brokerId, 
partitionReq.leaderEpoch, newIsr, partitionReq.currentIsrVersion))
+  }
+}
+
+def responseCallback(results: Either[Map[TopicPartition, Either[Errors, 
LeaderAndIsr]], Errors]): Unit = {
+  val resp = new AlterIsrResponseData()
+  results match {
+case Right(error) =>
+  resp.setErrorCode(error.code)
+case Left(partitionResults) =>
+  resp.setTopics(new util.ArrayList())
+  partitionResults
+.groupBy { case (tp, _) => tp.topic }   // Group by topic
+.foreach { case (topic, partitions) =>
+  // Add each topic part to the response
+  val topicResp = new AlterIsrResponseData.TopicData()
+.setName(topic)
+.setPartitions(new util.ArrayList())
+  resp.topics.add(topicResp)
+  partitions.foreach { case (tp, errorOrIsr) =>
+// Add each partition part to the response (new ISR or error)
+errorOrIsr match {
+  case Left(error) => topicResp.partitions.add(
+new AlterIsrResponseData.PartitionData()
+  .setPartitionIndex(tp.partition)
+  .setErrorCode(error.code))
+  case Right(leaderAndIsr) => topicResp.partitions.add(
+new AlterIsrResponseData.PartitionData()
+  .setPartitionIndex(tp.partition)
+  .setLeaderId(leaderAndIsr.leader)
+  .setLeaderEpoch(leaderAndIsr.leaderEpoch)
+  .setIsr(leaderAndIsr.isr.map(Integer.valueOf).asJava)
+  .setCurrentIsrVersion(leaderAndIsr.zkVersion))
+}
+}
+  }
+  }
+  callback.apply(resp)
+}
+
+eventManager.put(AlterIsrReceived(alterIsrRequest.brokerId, 
alterIsrRequest.brokerEpoch, isrsToAlter, responseCallback))
+  }
+
+  private def processAlterIsr(brokerId: Int, brokerEpoch: Long, isrsToAlter: 
Map[TopicPartition, LeaderAndIsr],
+  callback: AlterIsrCallback): Unit = {
+
+// Handle a few short-circuits
+if (!isActive) {
+  callback.apply(Right(Errors.NOT_CONTROLLER))
+  return
+}
+
+val brokerEpochOpt = controllerContext.liveBrokerIdAndEpochs.get(brokerId)
+if (brokerEpochOpt.isEmpty) {
+  info(s"Ignoring AlterIsr due to unknown broker $brokerId")
+  callback.apply(Right(Errors.STALE_BROKER_EPOCH))
+  return
+}
+
+if (!brokerEpochOpt.contains(brokerEpoch)) {
+  info(s"Ignoring AlterIsr due to stale broker epoch $brokerEpoch for 
broker $brokerId")
+  callback.apply(Right(Errors.STALE_BROKER_EPOCH))
+  return
+}
+
+val response = try {
+  val partitionResponses = mutable.HashMap[TopicPartition, Either[Errors, 
LeaderAndIsr]]()
+
+  // Determine which partitions we will accept the new ISR for
+  val adjustedIsrs: Map[TopicPartition, LeaderAndIsr] = 
isrsToAlter.flatMap {
+case (tp: TopicPartition, newLeaderAndIsr: LeaderAndIsr) =>
+  val partitionError: Errors = 
controllerContext.partitionLeadershipInfo(tp) match {
+case Some(leaderIsrAndControllerEpoch) =>
+  val currentLeaderAndIsr = 
leaderIsrAndControllerEpoch.leaderAndIsr
+  if (newLeaderAndIsr.leaderEpoch < 
currentLeaderAndIsr.leaderEpoch) {

Review comment:
   Consider the following scenario:
   
   1) broker sends AlterIsr
   2) the update succeeds but the response is lost
   3) broker retries AlterIsr
   
   Currently the leader will be stuck after 3) because it has no way to get the 
latest LeaderAndIsr state if the first attempt fails. To handle this, I think 
we need to add an idempotence check here. After we have validated the leader 
epoch, if the intended state matches the current state, then we can just return 
the current state. 

##
File path: core/src/main/scala/kafka/cluster/Partition.scala
##
@@ -1246,6 +1351,51 @@ class Partition(val topicPartition: TopicPartition,
 }
   }
 
+  private def sendAlterIsrRequest(proposedIsrState: IsrState): Boolean = {
+val isrToSendOpt: 

[GitHub] [kafka] hachikuji commented on a change in pull request #9100: Add AlterISR RPC and use it for ISR modifications

2020-09-17 Thread GitBox


hachikuji commented on a change in pull request #9100:
URL: https://github.com/apache/kafka/pull/9100#discussion_r490530496



##
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##
@@ -1763,6 +1768,143 @@ class KafkaController(val config: KafkaConfig,
 }
   }
 
+  def alterIsrs(alterIsrRequest: AlterIsrRequestData, callback: 
AlterIsrResponseData => Unit): Unit = {
+val isrsToAlter = mutable.Map[TopicPartition, LeaderAndIsr]()
+
+alterIsrRequest.topics.forEach { topicReq =>
+  topicReq.partitions.forEach { partitionReq =>
+val tp = new TopicPartition(topicReq.name, partitionReq.partitionIndex)
+val newIsr = partitionReq.newIsr().asScala.toList.map(_.toInt)
+isrsToAlter.put(tp, new LeaderAndIsr(alterIsrRequest.brokerId, 
partitionReq.leaderEpoch, newIsr, partitionReq.currentIsrVersion))
+  }
+}
+
+def responseCallback(results: Either[Map[TopicPartition, Either[Errors, 
LeaderAndIsr]], Errors]): Unit = {
+  val resp = new AlterIsrResponseData()
+  results match {
+case Right(error) =>
+  resp.setErrorCode(error.code)
+case Left(partitionResults) =>
+  resp.setTopics(new util.ArrayList())
+  partitionResults.groupBy(_._1.topic).foreach { entry =>

Review comment:
   nit: can we avoid using `_1` and `_2`? It's a lot easier to follow if 
they are named.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #9100: Add AlterISR RPC and use it for ISR modifications

2020-09-17 Thread GitBox


hachikuji commented on a change in pull request #9100:
URL: https://github.com/apache/kafka/pull/9100#discussion_r490527627



##
File path: core/src/main/scala/kafka/cluster/Partition.scala
##
@@ -1246,6 +1364,50 @@ class Partition(val topicPartition: TopicPartition,
 }
   }
 
+  private def sendAlterIsrRequest(): Boolean = {
+val isrToSend: Option[Set[Int]] = isrState match {
+  case PendingExpandIsr(isr, newInSyncReplicaId) => Some(isr + 
newInSyncReplicaId)
+  case PendingShrinkIsr(isr, outOfSyncReplicaIds) => Some(isr -- 
outOfSyncReplicaIds)
+  case CommittedIsr(_) =>
+error(s"Asked to send AlterIsr but there are no pending updates")
+None
+}
+if (isrToSend.isDefined) {
+  val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, 
isrToSend.get.toList, zkVersion)
+  val callbackPartial = handleAlterIsrResponse(isrToSend.get, _ : 
Either[Errors, LeaderAndIsr])
+  alterIsrManager.enqueue(AlterIsrItem(topicPartition, newLeaderAndIsr, 
callbackPartial))
+} else {
+  false
+}
+  }
+
+  private def handleAlterIsrResponse(proposedIsr: Set[Int], result: 
Either[Errors, LeaderAndIsr]): Unit = {
+inWriteLock(leaderIsrUpdateLock) {
+  result match {
+case Left(error: Errors) => error match {
+  case Errors.UNKNOWN_TOPIC_OR_PARTITION =>
+debug(s"Controller failed to update ISR to 
${proposedIsr.mkString(",")} since it doesn't know about this topic or 
partition. Giving up.")
+  case Errors.FENCED_LEADER_EPOCH =>
+debug(s"Controller failed to update ISR to 
${proposedIsr.mkString(",")} since we sent an old leader epoch. Giving up.")
+  case _ =>

Review comment:
   Since `INVALID_UPDATE_VERSION` is one of the expected errors at this 
level, can we add a separate case for it? For unexpected errors, we might want 
to log at warn level.

##
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##
@@ -1763,6 +1768,143 @@ class KafkaController(val config: KafkaConfig,
 }
   }
 
+  def alterIsrs(alterIsrRequest: AlterIsrRequestData, callback: 
AlterIsrResponseData => Unit): Unit = {
+val isrsToAlter = mutable.Map[TopicPartition, LeaderAndIsr]()
+
+alterIsrRequest.topics.forEach { topicReq =>
+  topicReq.partitions.forEach { partitionReq =>
+val tp = new TopicPartition(topicReq.name, partitionReq.partitionIndex)
+val newIsr = partitionReq.newIsr().asScala.toList.map(_.toInt)
+isrsToAlter.put(tp, new LeaderAndIsr(alterIsrRequest.brokerId, 
partitionReq.leaderEpoch, newIsr, partitionReq.currentIsrVersion))
+  }
+}
+
+def responseCallback(results: Either[Map[TopicPartition, Either[Errors, 
LeaderAndIsr]], Errors]): Unit = {
+  val resp = new AlterIsrResponseData()
+  results match {
+case Right(error) =>
+  resp.setErrorCode(error.code)
+case Left(partitionResults) =>
+  resp.setTopics(new util.ArrayList())
+  partitionResults.groupBy(_._1.topic).foreach { entry =>
+val topicResp = new AlterIsrResponseData.TopicData()
+  .setName(entry._1)
+  .setPartitions(new util.ArrayList())
+resp.topics.add(topicResp)
+entry._2.foreach { partitionEntry =>
+  partitionEntry._2 match {
+case Left(error) => topicResp.partitions.add(
+  new AlterIsrResponseData.PartitionData()
+.setPartitionIndex(partitionEntry._1.partition)
+.setErrorCode(error.code))
+case Right(leaderAndIsr) => topicResp.partitions.add(
+  new AlterIsrResponseData.PartitionData()
+.setPartitionIndex(partitionEntry._1.partition)
+.setLeaderId(leaderAndIsr.leader)
+.setLeaderEpoch(leaderAndIsr.leaderEpoch)
+.setIsr(leaderAndIsr.isr.map(Integer.valueOf).asJava)
+.setCurrentIsrVersion(leaderAndIsr.zkVersion))
+  }
+}
+  }
+  }
+  callback.apply(resp)
+}
+
+eventManager.put(AlterIsrReceived(alterIsrRequest.brokerId, 
alterIsrRequest.brokerEpoch, isrsToAlter, responseCallback))
+  }
+
+  private def processAlterIsr(brokerId: Int, brokerEpoch: Long, isrsToAlter: 
Map[TopicPartition, LeaderAndIsr],
+  callback: AlterIsrCallback): Unit = {
+
+// Handle a few short-circuits
+if (!isActive) {
+  callback.apply(Right(Errors.NOT_CONTROLLER))
+  return
+}
+
+val brokerEpochOpt = controllerContext.liveBrokerIdAndEpochs.get(brokerId)
+if (brokerEpochOpt.isEmpty) {
+  info(s"Ignoring AlterIsr due to unknown broker $brokerId")
+  callback.apply(Right(Errors.STALE_BROKER_EPOCH))
+  return
+}
+
+if (!brokerEpochOpt.contains(brokerEpoch)) {
+  info(s"Ignoring AlterIsr due to stale broker epoch 

[GitHub] [kafka] hachikuji commented on a change in pull request #9100: Add AlterISR RPC and use it for ISR modifications

2020-09-17 Thread GitBox


hachikuji commented on a change in pull request #9100:
URL: https://github.com/apache/kafka/pull/9100#discussion_r490464756



##
File path: core/src/main/scala/kafka/cluster/Partition.scala
##
@@ -157,6 +158,44 @@ case class OngoingReassignmentState(addingReplicas: 
Seq[Int],
 
 case class SimpleAssignmentState(replicas: Seq[Int]) extends AssignmentState
 
+
+
+sealed trait IsrState {
+  /**
+   * Includes only the in-sync replicas which have been committed to ZK.
+   */
+  def isr: Set[Int]
+
+  /**
+   * This set may include un-committed ISR members following an expansion. 
This "effective" ISR is used for advancing
+   * the high watermark as well as determining which replicas are required for 
acks=all produce requests.
+   *
+   * Only applicable as of IBP 2.7-IV2, for older versions this will return 
the committed ISR
+   *
+   */
+  def maximalIsr: Set[Int]
+
+  /**
+   * Indicates if we have an AlterIsr request inflight.
+   */
+  def inflight: Boolean

Review comment:
   nit: `hasInflight`?

##
File path: core/src/main/scala/kafka/cluster/Partition.scala
##
@@ -700,19 +764,16 @@ class Partition(val topicPartition: TopicPartition,
   inWriteLock(leaderIsrUpdateLock) {
 // check if this replica needs to be added to the ISR
 if (needsExpandIsr(followerReplica)) {
-  val newInSyncReplicaIds = inSyncReplicaIds + followerReplica.brokerId
-  info(s"Expanding ISR from ${inSyncReplicaIds.mkString(",")} to 
${newInSyncReplicaIds.mkString(",")}")
-  // update ISR in ZK and cache
-  expandIsr(newInSyncReplicaIds)
+  expandIsr(followerReplica.brokerId)
 }
   }
 }
   }
 
   private def needsExpandIsr(followerReplica: Replica): Boolean = {
-leaderLogIfLocal.exists { leaderLog =>
+!hasInFlightAlterIsr && leaderLogIfLocal.exists { leaderLog =>

Review comment:
   I think we can refactor this a little bit to avoid some duplication and 
inconsistency. We have the following logic above when updating follower state:
   ```scala
   if (!isrState.maximalIsr.contains(followerId))
 maybeExpandIsr(followerReplica, followerFetchTimeMs)
   ```
   This is a little inconsistent because here we are checking `isrState.isr`. 
I'd suggest splitting this method into something like the following:
   
   ```scala
   def hasReachedHighWatermark(follower: Replica): Boolean = {
 leaderLogIfLocal.exists { leaderLog =>
   val leaderHighwatermark = leaderLog.highWatermark
   isFollowerInSync(follower, leaderHighwatermark)
 }
   }
   
   def canAddToIsr(followerId: Int): Boolean = {
 val current = isrState
 !current.inflight && !current.isr.contains(followerId)
   }
   
   def needsExpandIsr(follower: Replica): Boolean = {
 canAddToIsr(follower.brokerId) && hasReachedHighWatermark(follower)
   }
   ```
   
   Then we can change the logic in `maybeExpandIsr` to the following:
   ```scala
   val needsIsrUpdate = canAddToIsr(followerReplica) && 
inReadLock(leaderIsrUpdateLock) {
   ...
   ```
   
   

##
File path: core/src/main/scala/kafka/cluster/Partition.scala
##
@@ -1246,6 +1364,50 @@ class Partition(val topicPartition: TopicPartition,
 }
   }
 
+  private def sendAlterIsrRequest(): Boolean = {
+val isrToSend: Option[Set[Int]] = isrState match {
+  case PendingExpandIsr(isr, newInSyncReplicaId) => Some(isr + 
newInSyncReplicaId)
+  case PendingShrinkIsr(isr, outOfSyncReplicaIds) => Some(isr -- 
outOfSyncReplicaIds)
+  case CommittedIsr(_) =>
+error(s"Asked to send AlterIsr but there are no pending updates")
+None
+}
+if (isrToSend.isDefined) {

Review comment:
   nit: can probably rework this as `exists`
   ```scala
   isrToSendOpt.exists { isrToSend =>
   ...
   }
   ```

##
File path: core/src/main/scala/kafka/cluster/Partition.scala
##
@@ -618,9 +682,9 @@ class Partition(val topicPartition: TopicPartition,
 // since the replica's logStartOffset may have incremented
 val leaderLWIncremented = newLeaderLW > oldLeaderLW
 
-// check if we need to expand ISR to include this replica
-// if it is not in the ISR yet
-if (!inSyncReplicaIds.contains(followerId))
+// Check if this in-sync replica needs to be added to the ISR. We look 
at the "maximal" ISR here so we don't
+// send an additional Alter ISR request for the same replica

Review comment:
   Another possibility is that the replica is pending removal in which case 
another `AlterIsr` will be needed. I think it might be more intuitive to make 
this check:
   
   ```scala
   if (!isrState.inflight && !isrState.isr.contains(followerId))
   ```

##
File path: core/src/main/scala/kafka/cluster/Partition.scala
##
@@ -858,10 +925,10 @@ class Partition(val topicPartition: TopicPartition,
 case Some(leaderLog) =>
   val outOfSyncReplicaIds = 

[GitHub] [kafka] hachikuji commented on a change in pull request #9100: Add AlterISR RPC and use it for ISR modifications

2020-09-16 Thread GitBox


hachikuji commented on a change in pull request #9100:
URL: https://github.com/apache/kafka/pull/9100#discussion_r489569400



##
File path: clients/src/main/resources/common/message/AlterIsrRequest.json
##
@@ -0,0 +1,44 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+  "apiKey": 52,
+  "type": "request",
+  "name": "AlterIsrRequest",
+  "validVersions": "0",
+  "flexibleVersions": "0+",
+  "fields": [
+{ "name": "BrokerId", "type": "int32", "versions": "0+", "entityType": 
"brokerId",
+  "about": "The ID of the requesting broker" },
+{ "name": "BrokerEpoch", "type": "int64", "versions": "0+", "default": 
"-1",
+  "about": "The epoch of the requesting broker" },
+{ "name": "Topics", "type": "[]TopicData", "versions": "0+", "fields": [
+  { "name":  "Name", "type": "string", "versions": "0+", "entityType": 
"topicName",
+"about": "The name of the topic to alter ISRs for" },
+  { "name": "Partitions", "type": "[]PartitionData", "versions": "0+", 
"fields": [
+{ "name": "PartitionIndex", "type": "int32", "versions": "0+",
+  "about": "The partition index" },
+{ "name": "LeaderId", "type": "int32", "versions": "0+", "entityType": 
"brokerId",

Review comment:
   I know we've gone back and forth on including some of these fields. This 
is one I'm inclined to get rid of since we already include "BrokerId" at the 
top level and `AlterIsr` can only be sent by leaders.

##
File path: clients/src/main/resources/common/message/AlterIsrResponse.json
##
@@ -0,0 +1,46 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+  "apiKey": 52,
+  "type": "response",
+  "name": "AlterIsrResponse",
+  "validVersions": "0",
+  "flexibleVersions": "0+",
+  "fields": [
+{ "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
+  "about": "The duration in milliseconds for which the request was 
throttled due to a quota violation, or zero if the request did not violate any 
quota." },
+{ "name": "ErrorCode", "type": "int16", "versions": "0+",
+  "about": "The top level response error code" },
+{ "name": "Topics", "type": "[]TopicData", "versions": "0+", "fields": [
+  { "name":  "Name", "type": "string", "versions": "0+", "entityType": 
"topicName",
+"about": "The name of the topic" },
+  { "name": "Partitions", "type": "[]PartitionData", "versions": "0+", 
"fields": [
+{ "name": "PartitionIndex", "type": "int32", "versions": "0+",
+  "about": "The partition index" },
+{ "name": "ErrorCode", "type": "int16", "versions": "0+",
+  "about": "The partition level error code" },
+{ "name": "Leader", "type": "int32", "versions": "0+", "entityType": 
"brokerId",

Review comment:
   nit: shall we call this `LeaderId` in line with `BrokerId` in the 
request?

##
File path: config/log4j.properties
##
@@ -76,8 +76,8 @@ log4j.additivity.kafka.request.logger=false
 log4j.logger.kafka.network.RequestChannel$=WARN, requestAppender
 log4j.additivity.kafka.network.RequestChannel$=false
 
-log4j.logger.kafka.controller=TRACE, controllerAppender
-log4j.additivity.kafka.controller=false
+log4j.logger.kafka.controller=DEBUG, controllerAppender

Review comment:
   Can we revert this change? I think the trace logging is intended, if a 
bit odd.

##
File path: core/src/main/scala/kafka/api/ApiVersion.scala
##
@@ -100,7 +100,9 @@ object 

[GitHub] [kafka] hachikuji commented on a change in pull request #9100: Add AlterISR RPC and use it for ISR modifications

2020-08-26 Thread GitBox


hachikuji commented on a change in pull request #9100:
URL: https://github.com/apache/kafka/pull/9100#discussion_r477618083



##
File path: clients/src/main/resources/common/message/AlterIsrResponse.json
##
@@ -0,0 +1,38 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+  "apiKey": 50,
+  "type": "response",
+  "name": "AlterIsrResponse",
+  "validVersions": "0",
+  "flexibleVersions": "none",
+  "fields": [
+{ "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
+  "about": "The duration in milliseconds for which the request was 
throttled due to a quota violation, or zero if the request did not violate any 
quota." },
+{ "name": "ErrorCode", "type": "int16", "versions": "0+",
+  "about": "The top level response error code" },
+{ "name": "Topics", "type": "[]AlterIsrResponseTopics", "versions": "0+", 
"fields": [

Review comment:
   nit: I think `AlterIsrResponseTopics` should be singular (similarly for 
other arrays in both of these schemas). 
   
   Also, I wonder if it's reasonable to leave off the `AlterIsr` prefix. We 
could access it as `AlterIsrResponse.TopicData` or something like that.

##
File path: core/src/main/scala/kafka/cluster/Partition.scala
##
@@ -1210,19 +1243,66 @@ class Partition(val topicPartition: TopicPartition,
 }
   }
 
-  private def expandIsr(newIsr: Set[Int]): Unit = {
-val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, 
newIsr.toList, zkVersion)
+  private[cluster] def expandIsr(newInSyncReplica: Int): Unit = {
+if (useAlterIsr) {
+  expandIsrWithAlterIsr(newInSyncReplica)
+} else {
+  expandIsrWithZk(newInSyncReplica)
+}
+  }
+
+  private def expandIsrWithAlterIsr(newInSyncReplica: Int): Unit = {
+// This is called from maybeExpandIsr which holds the ISR write lock
+if (pendingInSyncReplicaIds.isEmpty) {
+  // When expanding the ISR, we can safely assume the new replica will 
make it into the ISR since this puts us in
+  // a more constrained state for advancing the HW.
+  val newIsr = inSyncReplicaIds + newInSyncReplica
+  pendingInSyncReplicaIds = Some(newIsr)
+  debug(s"Adding new in-sync replica $newInSyncReplica. Pending ISR 
updated to [${newIsr.mkString(",")}] for $topicPartition")
+  alterIsr(newIsr)
+} else {
+  debug(s"ISR update in-flight, not adding new in-sync replica 
$newInSyncReplica for $topicPartition")

Review comment:
   Maybe trace would be better? This could get verbose while we have an 
inflight AlterIsr.

##
File path: clients/src/main/resources/common/message/AlterIsrRequest.json
##
@@ -0,0 +1,44 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+  "apiKey": 50,
+  "type": "request",
+  "name": "AlterIsrRequest",
+  "validVersions": "0",
+  "flexibleVersions": "none",

Review comment:
   We may as well add flexible version support for the request and response.

##
File path: core/src/main/scala/kafka/cluster/Partition.scala
##
@@ -1234,6 +1314,36 @@ class Partition(val topicPartition: TopicPartition,
 }
   }
 
+  private def alterIsr(newIsr: Set[Int]): Unit = {
+val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, 
newIsr.toList, zkVersion)
+alterIsrManager.enqueueIsrUpdate(AlterIsrItem(topicPartition, 
newLeaderAndIsr, {
+  inWriteLock(leaderIsrUpdateLock) {
+case Errors.NONE =>
+  debug(s"Controller accepted 

[GitHub] [kafka] hachikuji commented on a change in pull request #9100: Add AlterISR RPC and use it for ISR modifications

2020-08-12 Thread GitBox


hachikuji commented on a change in pull request #9100:
URL: https://github.com/apache/kafka/pull/9100#discussion_r469388095



##
File path: core/src/main/scala/kafka/cluster/Partition.scala
##
@@ -748,7 +755,7 @@ class Partition(val topicPartition: TopicPartition,
 leaderLogIfLocal match {
   case Some(leaderLog) =>
 // keep the current immutable replica list reference
-val curInSyncReplicaIds = inSyncReplicaIds
+val curInSyncReplicaIds = effectiveInSyncReplicaIds

Review comment:
   Related to the other comment, but we need to be careful with the min.isr 
check below. I think it is correct to wait for `effectiveInSyncReplicaIds` 
before acknowledging the produce request, but we should probably use the size 
of `inSyncReplicaIds` in the min.isr check since that is the only set we can 
guarantee.

##
File path: 
clients/src/main/java/org/apache/kafka/common/requests/AlterIsrRequest.java
##
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.message.AlterIsrRequestData;
+import org.apache.kafka.common.message.AlterIsrResponseData;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.Struct;
+
+public class AlterIsrRequest extends AbstractRequest {
+
+private final AlterIsrRequestData data;
+
+public AlterIsrRequest(AlterIsrRequestData data, short apiVersion) {
+super(ApiKeys.ALTER_ISR, apiVersion);
+this.data = data;
+}
+
+public AlterIsrRequestData data() {
+return data;
+}
+
+@Override
+protected Struct toStruct() {
+return data.toStruct(version());
+}
+
+/**
+ * Get an error response for a request with specified throttle time in the 
response if applicable
+ *
+ * @param throttleTimeMs

Review comment:
   nit: maybe drop the parameters if they do not need to be documented

##
File path: core/src/main/scala/kafka/cluster/Partition.scala
##
@@ -255,6 +255,10 @@ class Partition(val topicPartition: TopicPartition,
 
   def isAddingReplica(replicaId: Int): Boolean = 
assignmentState.isAddingReplica(replicaId)
 
+  // For advancing the HW we assume the largest ISR even if the controller 
hasn't made the change yet
+  // This set includes the latest ISR (as we learned from LeaderAndIsr) and 
any replicas from a pending ISR expansion
+  def effectiveInSyncReplicaIds: Set[Int] = inSyncReplicaIds | 
pendingInSyncReplicaIds

Review comment:
   We might need to be careful about performance here since this would get 
called on every follower fetch.

##
File path: core/src/main/scala/kafka/cluster/Partition.scala
##
@@ -485,13 +490,11 @@ class Partition(val topicPartition: TopicPartition,
   def makeLeader(partitionState: LeaderAndIsrPartitionState,

Review comment:
   There is a "classic" edge case in Kafka which goes as follows:
   
   1. Leader is 1, ISR is [1, 2, 3]
   2. Broker 3 begins controlled shutdown. While awaiting shutdown, it 
continues fetching.
   3. Controller bumps epoch and shrinks ISR to [1, 2] and notifies replicas
   4. Before controlled shutdown completes and 3 stops fetching, the leader 
adds it back to the ISR.
   
   This bug was fixed by KIP-320 which added epoch validation to the Fetch API. 
After shrinking the ISR in step 3, the controller will send `LeaderAndIsr` with 
the updated epoch to [1, 2] and `StopReplica` to [3]. So 3 will not send any 
fetches with the updated epoch, which means it's impossible for the leader to 
add 3 back after observing the shrink to [1, 2]. 
   
   I just want to make sure whether above is correct and whether `AlterIsr` 
changes it in any way. I think the answer is no as long as ISR expansion is 
_only_ done in response to a fetch request, but it's worth double-checking.

##
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##
@@ -1756,6 +1761,141 @@ class KafkaController(val config: KafkaConfig,
 }
   }
 
+  def alterIsrs(alterIsrRequest: AlterIsrRequestData, callback: 
AlterIsrResponseData => 

[GitHub] [kafka] hachikuji commented on a change in pull request #9100: Add AlterISR RPC and use it for ISR modifications

2020-08-03 Thread GitBox


hachikuji commented on a change in pull request #9100:
URL: https://github.com/apache/kafka/pull/9100#discussion_r464718819



##
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##
@@ -1771,6 +1775,127 @@ class KafkaController(val config: KafkaConfig,
 }
   }
 
+  // TODO is it okay to pull message classes down into the controller?
+  def alterIsrs(alterIsrRequest: AlterIsrRequestData, callback: 
AlterIsrResponseData => Unit): Unit = {
+//val brokerEpochOpt = 
controllerContext.liveBrokerIdAndEpochs.get(alterIsrRequest.brokerId())
+/*if (brokerEpochOpt.isEmpty) {
+  info(s"Ignoring AlterIsr due to unknown broker 
${alterIsrRequest.brokerId()}")
+  // TODO is INVALID_REQUEST a reasonable error here?
+  callback.apply(new 
AlterIsrResponseData().setErrorCode(Errors.INVALID_REQUEST.code))
+  return
+}
+
+if (!brokerEpochOpt.contains(alterIsrRequest.brokerEpoch())) {
+  info(s"Ignoring AlterIsr due to stale broker epoch 
${alterIsrRequest.brokerEpoch()} for broker ${alterIsrRequest.brokerId()}")
+  callback.apply(new 
AlterIsrResponseData().setErrorCode(Errors.STALE_BROKER_EPOCH.code))
+  return
+}*/
+
+val isrsToAlter = mutable.Map[TopicPartition, LeaderAndIsr]()
+
+val resp = new AlterIsrResponseData()
+resp.setTopics(new util.ArrayList())
+
+alterIsrRequest.topics().forEach(topicReq => {
+  val topicResp = new AlterIsrResponseTopics()
+.setName(topicReq.name())
+.setPartitions(new util.ArrayList())
+  resp.topics().add(topicResp)
+
+  topicReq.partitions().forEach(partitionReq => {
+val partitionResp = new AlterIsrResponsePartitions()
+  .setPartitionIndex(partitionReq.partitionIndex())
+topicResp.partitions().add(partitionResp)
+
+// For each partition who's ISR we are altering, let's do some upfront 
validation for the broker response

Review comment:
   To be clear, I'm not questioning the need for the validation, just the 
fact that it is done before enqueueing the event instead of when the event is 
processed.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #9100: Add AlterISR RPC and use it for ISR modifications

2020-08-03 Thread GitBox


hachikuji commented on a change in pull request #9100:
URL: https://github.com/apache/kafka/pull/9100#discussion_r464705276



##
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##
@@ -2956,6 +2956,22 @@ class KafkaApis(val requestChannel: RequestChannel,
 }
   }
 
+  def handleAlterIsrRequest(request: RequestChannel.Request): Unit = {
+val alterIsrRequest = request.body[AlterIsrRequest]
+
+authorizeClusterOperation(request, CLUSTER_ACTION);
+
+// TODO do we need throttling for this response?

Review comment:
   Probably reasonable to handle it the same way other inter-broker RPCs 
are handled.

##
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##
@@ -1771,6 +1775,127 @@ class KafkaController(val config: KafkaConfig,
 }
   }
 
+  // TODO is it okay to pull message classes down into the controller?
+  def alterIsrs(alterIsrRequest: AlterIsrRequestData, callback: 
AlterIsrResponseData => Unit): Unit = {
+//val brokerEpochOpt = 
controllerContext.liveBrokerIdAndEpochs.get(alterIsrRequest.brokerId())
+/*if (brokerEpochOpt.isEmpty) {
+  info(s"Ignoring AlterIsr due to unknown broker 
${alterIsrRequest.brokerId()}")
+  // TODO is INVALID_REQUEST a reasonable error here?

Review comment:
   Good question. Might be fair to assume the controller is correct and use 
STALE_BROKER_EPOCH. Once kip-500 is all done, it would be totally fair since 
the controller will be guaranteed to have the latest state. The other question 
is what the broker should do if it sees STALE_BROKER_EPOCH...

##
File path: core/src/main/scala/kafka/server/AlterIsrChannelManager.scala
##
@@ -0,0 +1,121 @@
+package kafka.server
+
+import java.util
+import java.util.concurrent.TimeUnit
+import java.util.concurrent.atomic.AtomicLong
+
+import kafka.api.LeaderAndIsr
+import kafka.metrics.KafkaMetricsGroup
+import kafka.utils.{Logging, Scheduler}
+import kafka.zk.KafkaZkClient
+import org.apache.kafka.clients.ClientResponse
+import org.apache.kafka.common.TopicPartition
+import 
org.apache.kafka.common.message.AlterIsrRequestData.{AlterIsrRequestPartitions, 
AlterIsrRequestTopics}
+import org.apache.kafka.common.message.{AlterIsrRequestData, 
AlterIsrResponseData}
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.requests.{AlterIsrRequest, AlterIsrResponse}
+
+import scala.collection.mutable
+import scala.jdk.CollectionConverters._
+
+/**
+ * Handles the sending of AlterIsr requests to the controller. Updating the 
ISR is an asynchronous operation,
+ * so partitions will learn about updates through LeaderAndIsr messages sent 
from the controller
+ */
+trait AlterIsrChannelManager {
+  val IsrChangePropagationBlackOut = 5000L
+  val IsrChangePropagationInterval = 6L
+
+  def enqueueIsrUpdate(alterIsrItem: AlterIsrItem): Unit
+
+  def startup(): Unit
+
+  def shutdown(): Unit
+}
+
+case class AlterIsrItem(topicPartition: TopicPartition, leaderAndIsr: 
LeaderAndIsr, callback: Errors => Unit)
+
+class AlterIsrChannelManagerImpl(val controllerChannelManager: 
BrokerToControllerChannelManager,
+ val zkClient: KafkaZkClient,
+ val scheduler: Scheduler,
+ val brokerId: Int,
+ val brokerEpoch: Long) extends 
AlterIsrChannelManager with Logging with KafkaMetricsGroup {
+
+  private val pendingIsrUpdates: mutable.Queue[AlterIsrItem] = new 
mutable.Queue[AlterIsrItem]()
+  private val lastIsrChangeMs = new AtomicLong(0)
+  private val lastIsrPropagationMs = new AtomicLong(0)
+
+  override def enqueueIsrUpdate(alterIsrItem: AlterIsrItem): Unit = {
+pendingIsrUpdates synchronized {
+  pendingIsrUpdates += alterIsrItem
+  lastIsrChangeMs.set(System.currentTimeMillis())
+}
+  }
+
+  override def startup(): Unit = {
+scheduler.schedule("alter-isr-send", maybePropagateIsrChanges _, period = 
2500L, unit = TimeUnit.MILLISECONDS)

Review comment:
   Hmm.. This adds a delay of 2.5s to every ISR change, which is a bit 
annoying. I guess the point is to allow batching? I think a better approach 
might be to send requests immediately on arrival, but set a limit on the 
maximum number of in-flight requests (maybe just 1) and let the changes 
accumulate when there is a request in-flight. Then we can still get a big 
batching benefit when there are a large number of ISR changes that need to be 
sent in a hurry.

##
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##
@@ -1771,6 +1775,127 @@ class KafkaController(val config: KafkaConfig,
 }
   }
 
+  // TODO is it okay to pull message classes down into the controller?
+  def alterIsrs(alterIsrRequest: AlterIsrRequestData, callback: 
AlterIsrResponseData => Unit): Unit = {
+//val brokerEpochOpt = 
controllerContext.liveBrokerIdAndEpochs.get(alterIsrRequest.brokerId())
+