hachikuji commented on a change in pull request #9100:
URL: https://github.com/apache/kafka/pull/9100#discussion_r490527627
##########
File path: core/src/main/scala/kafka/cluster/Partition.scala
##########
@@ -1246,6 +1364,50 @@ class Partition(val topicPartition: TopicPartition,
}
}
+ private def sendAlterIsrRequest(): Boolean = {
+ val isrToSend: Option[Set[Int]] = isrState match {
+ case PendingExpandIsr(isr, newInSyncReplicaId) => Some(isr +
newInSyncReplicaId)
+ case PendingShrinkIsr(isr, outOfSyncReplicaIds) => Some(isr --
outOfSyncReplicaIds)
+ case CommittedIsr(_) =>
+ error(s"Asked to send AlterIsr but there are no pending updates")
+ None
+ }
+ if (isrToSend.isDefined) {
+ val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch,
isrToSend.get.toList, zkVersion)
+ val callbackPartial = handleAlterIsrResponse(isrToSend.get, _ :
Either[Errors, LeaderAndIsr])
+ alterIsrManager.enqueue(AlterIsrItem(topicPartition, newLeaderAndIsr,
callbackPartial))
+ } else {
+ false
+ }
+ }
+
+ private def handleAlterIsrResponse(proposedIsr: Set[Int], result:
Either[Errors, LeaderAndIsr]): Unit = {
+ inWriteLock(leaderIsrUpdateLock) {
+ result match {
+ case Left(error: Errors) => error match {
+ case Errors.UNKNOWN_TOPIC_OR_PARTITION =>
+ debug(s"Controller failed to update ISR to
${proposedIsr.mkString(",")} since it doesn't know about this topic or
partition. Giving up.")
+ case Errors.FENCED_LEADER_EPOCH =>
+ debug(s"Controller failed to update ISR to
${proposedIsr.mkString(",")} since we sent an old leader epoch. Giving up.")
+ case _ =>
Review comment:
Since `INVALID_UPDATE_VERSION` is one of the expected errors at this
level, can we add a separate case for it? For unexpected errors, we might want
to log at warn level.
##########
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##########
@@ -1763,6 +1768,143 @@ class KafkaController(val config: KafkaConfig,
}
}
+ def alterIsrs(alterIsrRequest: AlterIsrRequestData, callback:
AlterIsrResponseData => Unit): Unit = {
+ val isrsToAlter = mutable.Map[TopicPartition, LeaderAndIsr]()
+
+ alterIsrRequest.topics.forEach { topicReq =>
+ topicReq.partitions.forEach { partitionReq =>
+ val tp = new TopicPartition(topicReq.name, partitionReq.partitionIndex)
+ val newIsr = partitionReq.newIsr().asScala.toList.map(_.toInt)
+ isrsToAlter.put(tp, new LeaderAndIsr(alterIsrRequest.brokerId,
partitionReq.leaderEpoch, newIsr, partitionReq.currentIsrVersion))
+ }
+ }
+
+ def responseCallback(results: Either[Map[TopicPartition, Either[Errors,
LeaderAndIsr]], Errors]): Unit = {
+ val resp = new AlterIsrResponseData()
+ results match {
+ case Right(error) =>
+ resp.setErrorCode(error.code)
+ case Left(partitionResults) =>
+ resp.setTopics(new util.ArrayList())
+ partitionResults.groupBy(_._1.topic).foreach { entry =>
+ val topicResp = new AlterIsrResponseData.TopicData()
+ .setName(entry._1)
+ .setPartitions(new util.ArrayList())
+ resp.topics.add(topicResp)
+ entry._2.foreach { partitionEntry =>
+ partitionEntry._2 match {
+ case Left(error) => topicResp.partitions.add(
+ new AlterIsrResponseData.PartitionData()
+ .setPartitionIndex(partitionEntry._1.partition)
+ .setErrorCode(error.code))
+ case Right(leaderAndIsr) => topicResp.partitions.add(
+ new AlterIsrResponseData.PartitionData()
+ .setPartitionIndex(partitionEntry._1.partition)
+ .setLeaderId(leaderAndIsr.leader)
+ .setLeaderEpoch(leaderAndIsr.leaderEpoch)
+ .setIsr(leaderAndIsr.isr.map(Integer.valueOf).asJava)
+ .setCurrentIsrVersion(leaderAndIsr.zkVersion))
+ }
+ }
+ }
+ }
+ callback.apply(resp)
+ }
+
+ eventManager.put(AlterIsrReceived(alterIsrRequest.brokerId,
alterIsrRequest.brokerEpoch, isrsToAlter, responseCallback))
+ }
+
+ private def processAlterIsr(brokerId: Int, brokerEpoch: Long, isrsToAlter:
Map[TopicPartition, LeaderAndIsr],
+ callback: AlterIsrCallback): Unit = {
+
+ // Handle a few short-circuits
+ if (!isActive) {
+ callback.apply(Right(Errors.NOT_CONTROLLER))
+ return
+ }
+
+ val brokerEpochOpt = controllerContext.liveBrokerIdAndEpochs.get(brokerId)
+ if (brokerEpochOpt.isEmpty) {
+ info(s"Ignoring AlterIsr due to unknown broker $brokerId")
+ callback.apply(Right(Errors.STALE_BROKER_EPOCH))
+ return
+ }
+
+ if (!brokerEpochOpt.contains(brokerEpoch)) {
+ info(s"Ignoring AlterIsr due to stale broker epoch $brokerEpoch for
broker $brokerId")
+ callback.apply(Right(Errors.STALE_BROKER_EPOCH))
+ return
+ }
+
+ val response = try {
+ val partitionResponses: mutable.Map[TopicPartition, Either[Errors,
LeaderAndIsr]] =
+ mutable.HashMap[TopicPartition, Either[Errors, LeaderAndIsr]]()
+
+ // Determine which partitions we will accept the new ISR for
+ val adjustedIsrs: Map[TopicPartition, LeaderAndIsr] =
isrsToAlter.flatMap {
+ case (tp: TopicPartition, newLeaderAndIsr: LeaderAndIsr) =>
+ val partitionError: Errors =
controllerContext.partitionLeadershipInfo(tp) match {
+ case Some(leaderIsrAndControllerEpoch) =>
+ val currentLeaderAndIsr =
leaderIsrAndControllerEpoch.leaderAndIsr
+ if (newLeaderAndIsr.leaderEpoch <
currentLeaderAndIsr.leaderEpoch) {
+ Errors.FENCED_LEADER_EPOCH
+ } else {
+ Errors.NONE
+ }
+ case None => Errors.UNKNOWN_TOPIC_OR_PARTITION
+ }
+ if (partitionError == Errors.NONE) {
+ Some(tp -> newLeaderAndIsr)
+ } else {
+ partitionResponses(tp) = Left(partitionError)
+ None
+ }
+ }
+
+ // Do the updates in ZK
+ debug(s"Updating ISRs for partitions: ${adjustedIsrs.keySet}.")
+ val UpdateLeaderAndIsrResult(finishedUpdates, badVersionUpdates) =
zkClient.updateLeaderAndIsr(
+ adjustedIsrs, controllerContext.epoch,
controllerContext.epochZkVersion)
+
+ val successfulUpdates: Map[TopicPartition, LeaderAndIsr] =
finishedUpdates.flatMap {
+ case (partition: TopicPartition, isrOrError: Either[Throwable,
LeaderAndIsr]) =>
+ isrOrError match {
+ case Right(updatedIsr) =>
+ debug("ISR for partition %s updated to [%s] and zkVersion
updated to [%d]".format(partition, updatedIsr.isr.mkString(","),
updatedIsr.zkVersion))
+ partitionResponses(partition) = Right(updatedIsr)
+ Some(partition -> updatedIsr)
+ case Left(error) =>
+ warn(s"Failed to update ISR for partition $partition", error)
+ partitionResponses(partition) = Left(Errors.forException(error))
+ None
+ }
+ }
+
+ badVersionUpdates.foreach(partition => {
+ warn(s"Failed to update ISR for partition $partition, bad ZK version")
+ partitionResponses(partition) = Left(Errors.INVALID_UPDATE_VERSION)
+ })
+
+ def processUpdateNotifications(partitions: Seq[TopicPartition]): Unit = {
+ val liveBrokers: Seq[Int] =
controllerContext.liveOrShuttingDownBrokerIds.toSeq
+ debug(s"Sending MetadataRequest to Brokers: $liveBrokers for
TopicPartitions: $partitions")
+ sendUpdateMetadataRequest(liveBrokers, partitions.toSet)
+ }
+
+ // Update our cache and send out metadata updates
+ updateLeaderAndIsrCache(successfulUpdates.keys.toSeq)
+ processUpdateNotifications(isrsToAlter.keys.toSeq)
+
+ Left(partitionResponses)
+ } catch {
+ case e: Throwable =>
+ error(s"Error when processing AlterIsr request", e)
Review comment:
Shall we include some details about the failed request?
##########
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##########
@@ -1763,6 +1768,143 @@ class KafkaController(val config: KafkaConfig,
}
}
+ def alterIsrs(alterIsrRequest: AlterIsrRequestData, callback:
AlterIsrResponseData => Unit): Unit = {
+ val isrsToAlter = mutable.Map[TopicPartition, LeaderAndIsr]()
+
+ alterIsrRequest.topics.forEach { topicReq =>
+ topicReq.partitions.forEach { partitionReq =>
+ val tp = new TopicPartition(topicReq.name, partitionReq.partitionIndex)
+ val newIsr = partitionReq.newIsr().asScala.toList.map(_.toInt)
+ isrsToAlter.put(tp, new LeaderAndIsr(alterIsrRequest.brokerId,
partitionReq.leaderEpoch, newIsr, partitionReq.currentIsrVersion))
+ }
+ }
+
+ def responseCallback(results: Either[Map[TopicPartition, Either[Errors,
LeaderAndIsr]], Errors]): Unit = {
+ val resp = new AlterIsrResponseData()
+ results match {
+ case Right(error) =>
+ resp.setErrorCode(error.code)
+ case Left(partitionResults) =>
+ resp.setTopics(new util.ArrayList())
+ partitionResults.groupBy(_._1.topic).foreach { entry =>
+ val topicResp = new AlterIsrResponseData.TopicData()
+ .setName(entry._1)
+ .setPartitions(new util.ArrayList())
+ resp.topics.add(topicResp)
+ entry._2.foreach { partitionEntry =>
+ partitionEntry._2 match {
+ case Left(error) => topicResp.partitions.add(
+ new AlterIsrResponseData.PartitionData()
+ .setPartitionIndex(partitionEntry._1.partition)
+ .setErrorCode(error.code))
+ case Right(leaderAndIsr) => topicResp.partitions.add(
+ new AlterIsrResponseData.PartitionData()
+ .setPartitionIndex(partitionEntry._1.partition)
+ .setLeaderId(leaderAndIsr.leader)
+ .setLeaderEpoch(leaderAndIsr.leaderEpoch)
+ .setIsr(leaderAndIsr.isr.map(Integer.valueOf).asJava)
+ .setCurrentIsrVersion(leaderAndIsr.zkVersion))
+ }
+ }
+ }
+ }
+ callback.apply(resp)
+ }
+
+ eventManager.put(AlterIsrReceived(alterIsrRequest.brokerId,
alterIsrRequest.brokerEpoch, isrsToAlter, responseCallback))
+ }
+
+ private def processAlterIsr(brokerId: Int, brokerEpoch: Long, isrsToAlter:
Map[TopicPartition, LeaderAndIsr],
+ callback: AlterIsrCallback): Unit = {
+
+ // Handle a few short-circuits
+ if (!isActive) {
+ callback.apply(Right(Errors.NOT_CONTROLLER))
+ return
+ }
+
+ val brokerEpochOpt = controllerContext.liveBrokerIdAndEpochs.get(brokerId)
+ if (brokerEpochOpt.isEmpty) {
+ info(s"Ignoring AlterIsr due to unknown broker $brokerId")
+ callback.apply(Right(Errors.STALE_BROKER_EPOCH))
+ return
+ }
+
+ if (!brokerEpochOpt.contains(brokerEpoch)) {
+ info(s"Ignoring AlterIsr due to stale broker epoch $brokerEpoch for
broker $brokerId")
+ callback.apply(Right(Errors.STALE_BROKER_EPOCH))
+ return
+ }
+
+ val response = try {
+ val partitionResponses: mutable.Map[TopicPartition, Either[Errors,
LeaderAndIsr]] =
+ mutable.HashMap[TopicPartition, Either[Errors, LeaderAndIsr]]()
+
+ // Determine which partitions we will accept the new ISR for
+ val adjustedIsrs: Map[TopicPartition, LeaderAndIsr] =
isrsToAlter.flatMap {
+ case (tp: TopicPartition, newLeaderAndIsr: LeaderAndIsr) =>
+ val partitionError: Errors =
controllerContext.partitionLeadershipInfo(tp) match {
+ case Some(leaderIsrAndControllerEpoch) =>
+ val currentLeaderAndIsr =
leaderIsrAndControllerEpoch.leaderAndIsr
+ if (newLeaderAndIsr.leaderEpoch <
currentLeaderAndIsr.leaderEpoch) {
+ Errors.FENCED_LEADER_EPOCH
+ } else {
+ Errors.NONE
+ }
+ case None => Errors.UNKNOWN_TOPIC_OR_PARTITION
+ }
+ if (partitionError == Errors.NONE) {
+ Some(tp -> newLeaderAndIsr)
+ } else {
+ partitionResponses(tp) = Left(partitionError)
+ None
+ }
+ }
+
+ // Do the updates in ZK
+ debug(s"Updating ISRs for partitions: ${adjustedIsrs.keySet}.")
+ val UpdateLeaderAndIsrResult(finishedUpdates, badVersionUpdates) =
zkClient.updateLeaderAndIsr(
+ adjustedIsrs, controllerContext.epoch,
controllerContext.epochZkVersion)
+
+ val successfulUpdates: Map[TopicPartition, LeaderAndIsr] =
finishedUpdates.flatMap {
+ case (partition: TopicPartition, isrOrError: Either[Throwable,
LeaderAndIsr]) =>
+ isrOrError match {
+ case Right(updatedIsr) =>
+ debug("ISR for partition %s updated to [%s] and zkVersion
updated to [%d]".format(partition, updatedIsr.isr.mkString(","),
updatedIsr.zkVersion))
Review comment:
nit: rewrite with `$`
##########
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##########
@@ -1763,6 +1768,143 @@ class KafkaController(val config: KafkaConfig,
}
}
+ def alterIsrs(alterIsrRequest: AlterIsrRequestData, callback:
AlterIsrResponseData => Unit): Unit = {
+ val isrsToAlter = mutable.Map[TopicPartition, LeaderAndIsr]()
+
+ alterIsrRequest.topics.forEach { topicReq =>
+ topicReq.partitions.forEach { partitionReq =>
+ val tp = new TopicPartition(topicReq.name, partitionReq.partitionIndex)
+ val newIsr = partitionReq.newIsr().asScala.toList.map(_.toInt)
+ isrsToAlter.put(tp, new LeaderAndIsr(alterIsrRequest.brokerId,
partitionReq.leaderEpoch, newIsr, partitionReq.currentIsrVersion))
+ }
+ }
+
+ def responseCallback(results: Either[Map[TopicPartition, Either[Errors,
LeaderAndIsr]], Errors]): Unit = {
+ val resp = new AlterIsrResponseData()
+ results match {
+ case Right(error) =>
+ resp.setErrorCode(error.code)
+ case Left(partitionResults) =>
+ resp.setTopics(new util.ArrayList())
+ partitionResults.groupBy(_._1.topic).foreach { entry =>
+ val topicResp = new AlterIsrResponseData.TopicData()
+ .setName(entry._1)
+ .setPartitions(new util.ArrayList())
+ resp.topics.add(topicResp)
+ entry._2.foreach { partitionEntry =>
+ partitionEntry._2 match {
+ case Left(error) => topicResp.partitions.add(
+ new AlterIsrResponseData.PartitionData()
+ .setPartitionIndex(partitionEntry._1.partition)
+ .setErrorCode(error.code))
+ case Right(leaderAndIsr) => topicResp.partitions.add(
+ new AlterIsrResponseData.PartitionData()
+ .setPartitionIndex(partitionEntry._1.partition)
+ .setLeaderId(leaderAndIsr.leader)
+ .setLeaderEpoch(leaderAndIsr.leaderEpoch)
+ .setIsr(leaderAndIsr.isr.map(Integer.valueOf).asJava)
+ .setCurrentIsrVersion(leaderAndIsr.zkVersion))
+ }
+ }
+ }
+ }
+ callback.apply(resp)
+ }
+
+ eventManager.put(AlterIsrReceived(alterIsrRequest.brokerId,
alterIsrRequest.brokerEpoch, isrsToAlter, responseCallback))
+ }
+
+ private def processAlterIsr(brokerId: Int, brokerEpoch: Long, isrsToAlter:
Map[TopicPartition, LeaderAndIsr],
+ callback: AlterIsrCallback): Unit = {
+
+ // Handle a few short-circuits
+ if (!isActive) {
+ callback.apply(Right(Errors.NOT_CONTROLLER))
+ return
+ }
+
+ val brokerEpochOpt = controllerContext.liveBrokerIdAndEpochs.get(brokerId)
+ if (brokerEpochOpt.isEmpty) {
+ info(s"Ignoring AlterIsr due to unknown broker $brokerId")
+ callback.apply(Right(Errors.STALE_BROKER_EPOCH))
+ return
+ }
+
+ if (!brokerEpochOpt.contains(brokerEpoch)) {
+ info(s"Ignoring AlterIsr due to stale broker epoch $brokerEpoch for
broker $brokerId")
+ callback.apply(Right(Errors.STALE_BROKER_EPOCH))
+ return
+ }
+
+ val response = try {
+ val partitionResponses: mutable.Map[TopicPartition, Either[Errors,
LeaderAndIsr]] =
+ mutable.HashMap[TopicPartition, Either[Errors, LeaderAndIsr]]()
+
+ // Determine which partitions we will accept the new ISR for
+ val adjustedIsrs: Map[TopicPartition, LeaderAndIsr] =
isrsToAlter.flatMap {
+ case (tp: TopicPartition, newLeaderAndIsr: LeaderAndIsr) =>
+ val partitionError: Errors =
controllerContext.partitionLeadershipInfo(tp) match {
+ case Some(leaderIsrAndControllerEpoch) =>
+ val currentLeaderAndIsr =
leaderIsrAndControllerEpoch.leaderAndIsr
+ if (newLeaderAndIsr.leaderEpoch <
currentLeaderAndIsr.leaderEpoch) {
+ Errors.FENCED_LEADER_EPOCH
+ } else {
+ Errors.NONE
+ }
+ case None => Errors.UNKNOWN_TOPIC_OR_PARTITION
+ }
+ if (partitionError == Errors.NONE) {
+ Some(tp -> newLeaderAndIsr)
+ } else {
+ partitionResponses(tp) = Left(partitionError)
+ None
+ }
+ }
+
+ // Do the updates in ZK
+ debug(s"Updating ISRs for partitions: ${adjustedIsrs.keySet}.")
+ val UpdateLeaderAndIsrResult(finishedUpdates, badVersionUpdates) =
zkClient.updateLeaderAndIsr(
+ adjustedIsrs, controllerContext.epoch,
controllerContext.epochZkVersion)
+
+ val successfulUpdates: Map[TopicPartition, LeaderAndIsr] =
finishedUpdates.flatMap {
+ case (partition: TopicPartition, isrOrError: Either[Throwable,
LeaderAndIsr]) =>
+ isrOrError match {
+ case Right(updatedIsr) =>
+ debug("ISR for partition %s updated to [%s] and zkVersion
updated to [%d]".format(partition, updatedIsr.isr.mkString(","),
updatedIsr.zkVersion))
+ partitionResponses(partition) = Right(updatedIsr)
+ Some(partition -> updatedIsr)
+ case Left(error) =>
+ warn(s"Failed to update ISR for partition $partition", error)
+ partitionResponses(partition) = Left(Errors.forException(error))
+ None
+ }
+ }
+
+ badVersionUpdates.foreach(partition => {
+ warn(s"Failed to update ISR for partition $partition, bad ZK version")
Review comment:
I think `warn` might be too high here. We should expect to see some of
these even if the cluster is working properly. How about debug?
##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -3054,6 +3054,26 @@ class KafkaApis(val requestChannel: RequestChannel,
}
}
+ def handleAlterIsrRequest(request: RequestChannel.Request): Unit = {
+ val alterIsrRequest = request.body[AlterIsrRequest]
+
+ if (authorize(request.context, CLUSTER_ACTION, CLUSTER, CLUSTER_NAME)) {
Review comment:
nit: it's subjective, so feel free to ignore, but I find this a little
easier to read if we handle the error cases first. So..
```scala
if (!authorize(request.context, CLUSTER_ACTION, CLUSTER, CLUSTER_NAME)) {
sendResponseMaybeThrottle(request, requestThrottleMs =>
alterIsrRequest.getErrorResponse(requestThrottleMs,
Errors.CLUSTER_AUTHORIZATION_FAILED.exception))Errors.CLUSTER_AUTHORIZATION_FAILED.exception))
} else if (!controller.isActive) {
sendResponseMaybeThrottle(request, requestThrottleMs =>
alterIsrRequest.getErrorResponse(requestThrottleMs,
Errors.NOT_CONTROLLER.exception()))
} else {
...
}
```
Basically we're discarding the error cases so that the successful path
continues flowing downward and we're avoiding extra nesting. Like I said, it's
subjective.
##########
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##########
@@ -1763,6 +1768,143 @@ class KafkaController(val config: KafkaConfig,
}
}
+ def alterIsrs(alterIsrRequest: AlterIsrRequestData, callback:
AlterIsrResponseData => Unit): Unit = {
+ val isrsToAlter = mutable.Map[TopicPartition, LeaderAndIsr]()
+
+ alterIsrRequest.topics.forEach { topicReq =>
+ topicReq.partitions.forEach { partitionReq =>
+ val tp = new TopicPartition(topicReq.name, partitionReq.partitionIndex)
+ val newIsr = partitionReq.newIsr().asScala.toList.map(_.toInt)
+ isrsToAlter.put(tp, new LeaderAndIsr(alterIsrRequest.brokerId,
partitionReq.leaderEpoch, newIsr, partitionReq.currentIsrVersion))
+ }
+ }
+
+ def responseCallback(results: Either[Map[TopicPartition, Either[Errors,
LeaderAndIsr]], Errors]): Unit = {
+ val resp = new AlterIsrResponseData()
+ results match {
+ case Right(error) =>
+ resp.setErrorCode(error.code)
+ case Left(partitionResults) =>
+ resp.setTopics(new util.ArrayList())
+ partitionResults.groupBy(_._1.topic).foreach { entry =>
+ val topicResp = new AlterIsrResponseData.TopicData()
+ .setName(entry._1)
+ .setPartitions(new util.ArrayList())
+ resp.topics.add(topicResp)
+ entry._2.foreach { partitionEntry =>
+ partitionEntry._2 match {
+ case Left(error) => topicResp.partitions.add(
+ new AlterIsrResponseData.PartitionData()
+ .setPartitionIndex(partitionEntry._1.partition)
+ .setErrorCode(error.code))
+ case Right(leaderAndIsr) => topicResp.partitions.add(
+ new AlterIsrResponseData.PartitionData()
+ .setPartitionIndex(partitionEntry._1.partition)
+ .setLeaderId(leaderAndIsr.leader)
+ .setLeaderEpoch(leaderAndIsr.leaderEpoch)
+ .setIsr(leaderAndIsr.isr.map(Integer.valueOf).asJava)
+ .setCurrentIsrVersion(leaderAndIsr.zkVersion))
+ }
+ }
+ }
+ }
+ callback.apply(resp)
+ }
+
+ eventManager.put(AlterIsrReceived(alterIsrRequest.brokerId,
alterIsrRequest.brokerEpoch, isrsToAlter, responseCallback))
+ }
+
+ private def processAlterIsr(brokerId: Int, brokerEpoch: Long, isrsToAlter:
Map[TopicPartition, LeaderAndIsr],
+ callback: AlterIsrCallback): Unit = {
+
+ // Handle a few short-circuits
+ if (!isActive) {
+ callback.apply(Right(Errors.NOT_CONTROLLER))
+ return
+ }
+
+ val brokerEpochOpt = controllerContext.liveBrokerIdAndEpochs.get(brokerId)
+ if (brokerEpochOpt.isEmpty) {
+ info(s"Ignoring AlterIsr due to unknown broker $brokerId")
+ callback.apply(Right(Errors.STALE_BROKER_EPOCH))
+ return
+ }
+
+ if (!brokerEpochOpt.contains(brokerEpoch)) {
+ info(s"Ignoring AlterIsr due to stale broker epoch $brokerEpoch for
broker $brokerId")
+ callback.apply(Right(Errors.STALE_BROKER_EPOCH))
+ return
+ }
+
+ val response = try {
+ val partitionResponses: mutable.Map[TopicPartition, Either[Errors,
LeaderAndIsr]] =
Review comment:
nit: use type inference. It's conventional to write this as
```scala
val partitionResponses = mutable.Map[TopicPartition, Either[Errors,
LeaderAndIsr]].empty()
```
##########
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##########
@@ -1763,6 +1768,143 @@ class KafkaController(val config: KafkaConfig,
}
}
+ def alterIsrs(alterIsrRequest: AlterIsrRequestData, callback:
AlterIsrResponseData => Unit): Unit = {
+ val isrsToAlter = mutable.Map[TopicPartition, LeaderAndIsr]()
+
+ alterIsrRequest.topics.forEach { topicReq =>
+ topicReq.partitions.forEach { partitionReq =>
+ val tp = new TopicPartition(topicReq.name, partitionReq.partitionIndex)
+ val newIsr = partitionReq.newIsr().asScala.toList.map(_.toInt)
+ isrsToAlter.put(tp, new LeaderAndIsr(alterIsrRequest.brokerId,
partitionReq.leaderEpoch, newIsr, partitionReq.currentIsrVersion))
+ }
+ }
+
+ def responseCallback(results: Either[Map[TopicPartition, Either[Errors,
LeaderAndIsr]], Errors]): Unit = {
+ val resp = new AlterIsrResponseData()
+ results match {
+ case Right(error) =>
+ resp.setErrorCode(error.code)
+ case Left(partitionResults) =>
+ resp.setTopics(new util.ArrayList())
+ partitionResults.groupBy(_._1.topic).foreach { entry =>
+ val topicResp = new AlterIsrResponseData.TopicData()
+ .setName(entry._1)
+ .setPartitions(new util.ArrayList())
+ resp.topics.add(topicResp)
+ entry._2.foreach { partitionEntry =>
+ partitionEntry._2 match {
+ case Left(error) => topicResp.partitions.add(
+ new AlterIsrResponseData.PartitionData()
+ .setPartitionIndex(partitionEntry._1.partition)
+ .setErrorCode(error.code))
+ case Right(leaderAndIsr) => topicResp.partitions.add(
+ new AlterIsrResponseData.PartitionData()
+ .setPartitionIndex(partitionEntry._1.partition)
+ .setLeaderId(leaderAndIsr.leader)
+ .setLeaderEpoch(leaderAndIsr.leaderEpoch)
+ .setIsr(leaderAndIsr.isr.map(Integer.valueOf).asJava)
+ .setCurrentIsrVersion(leaderAndIsr.zkVersion))
+ }
+ }
+ }
+ }
+ callback.apply(resp)
+ }
+
+ eventManager.put(AlterIsrReceived(alterIsrRequest.brokerId,
alterIsrRequest.brokerEpoch, isrsToAlter, responseCallback))
+ }
+
+ private def processAlterIsr(brokerId: Int, brokerEpoch: Long, isrsToAlter:
Map[TopicPartition, LeaderAndIsr],
+ callback: AlterIsrCallback): Unit = {
+
+ // Handle a few short-circuits
+ if (!isActive) {
+ callback.apply(Right(Errors.NOT_CONTROLLER))
+ return
+ }
+
+ val brokerEpochOpt = controllerContext.liveBrokerIdAndEpochs.get(brokerId)
+ if (brokerEpochOpt.isEmpty) {
+ info(s"Ignoring AlterIsr due to unknown broker $brokerId")
+ callback.apply(Right(Errors.STALE_BROKER_EPOCH))
+ return
+ }
+
+ if (!brokerEpochOpt.contains(brokerEpoch)) {
+ info(s"Ignoring AlterIsr due to stale broker epoch $brokerEpoch for
broker $brokerId")
+ callback.apply(Right(Errors.STALE_BROKER_EPOCH))
+ return
+ }
+
+ val response = try {
+ val partitionResponses: mutable.Map[TopicPartition, Either[Errors,
LeaderAndIsr]] =
+ mutable.HashMap[TopicPartition, Either[Errors, LeaderAndIsr]]()
+
+ // Determine which partitions we will accept the new ISR for
+ val adjustedIsrs: Map[TopicPartition, LeaderAndIsr] =
isrsToAlter.flatMap {
+ case (tp: TopicPartition, newLeaderAndIsr: LeaderAndIsr) =>
+ val partitionError: Errors =
controllerContext.partitionLeadershipInfo(tp) match {
+ case Some(leaderIsrAndControllerEpoch) =>
+ val currentLeaderAndIsr =
leaderIsrAndControllerEpoch.leaderAndIsr
+ if (newLeaderAndIsr.leaderEpoch <
currentLeaderAndIsr.leaderEpoch) {
+ Errors.FENCED_LEADER_EPOCH
+ } else {
+ Errors.NONE
+ }
+ case None => Errors.UNKNOWN_TOPIC_OR_PARTITION
+ }
+ if (partitionError == Errors.NONE) {
+ Some(tp -> newLeaderAndIsr)
+ } else {
+ partitionResponses(tp) = Left(partitionError)
+ None
+ }
+ }
+
+ // Do the updates in ZK
+ debug(s"Updating ISRs for partitions: ${adjustedIsrs.keySet}.")
+ val UpdateLeaderAndIsrResult(finishedUpdates, badVersionUpdates) =
zkClient.updateLeaderAndIsr(
+ adjustedIsrs, controllerContext.epoch,
controllerContext.epochZkVersion)
+
+ val successfulUpdates: Map[TopicPartition, LeaderAndIsr] =
finishedUpdates.flatMap {
+ case (partition: TopicPartition, isrOrError: Either[Throwable,
LeaderAndIsr]) =>
+ isrOrError match {
+ case Right(updatedIsr) =>
+ debug("ISR for partition %s updated to [%s] and zkVersion
updated to [%d]".format(partition, updatedIsr.isr.mkString(","),
updatedIsr.zkVersion))
+ partitionResponses(partition) = Right(updatedIsr)
+ Some(partition -> updatedIsr)
+ case Left(error) =>
+ warn(s"Failed to update ISR for partition $partition", error)
+ partitionResponses(partition) = Left(Errors.forException(error))
+ None
+ }
+ }
+
+ badVersionUpdates.foreach(partition => {
+ warn(s"Failed to update ISR for partition $partition, bad ZK version")
+ partitionResponses(partition) = Left(Errors.INVALID_UPDATE_VERSION)
+ })
+
+ def processUpdateNotifications(partitions: Seq[TopicPartition]): Unit = {
+ val liveBrokers: Seq[Int] =
controllerContext.liveOrShuttingDownBrokerIds.toSeq
+ debug(s"Sending MetadataRequest to Brokers: $liveBrokers for
TopicPartitions: $partitions")
Review comment:
nit: I think we can get rid of this. The logging in
`ControllerChannelManager.sendUpdateMetadataRequests` is probably good enough.
##########
File path: core/src/main/scala/kafka/server/KafkaServer.scala
##########
@@ -298,9 +298,12 @@ class KafkaServer(val config: KafkaConfig, time: Time =
Time.SYSTEM, threadNameP
socketServer = new SocketServer(config, metrics, time,
credentialProvider)
socketServer.startup(startProcessingRequests = false)
+
Review comment:
nit: unneeded newline
##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -1358,7 +1366,8 @@ class ReplicaManager(val config: KafkaConfig,
stateChangeLogger.info(s"Ignoring LeaderAndIsr request from " +
s"controller $controllerId with correlation id
$correlationId " +
s"epoch $controllerEpoch for partition $topicPartition since
its associated " +
- s"leader epoch $requestLeaderEpoch matches the current
leader epoch")
+ s"leader epoch $requestLeaderEpoch matches the current
leader epoch " +
+ s"and the zk version $requestZkVersion matches the current
zk version")
Review comment:
nit: not sure it makes sense to include this change any longer
##########
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##########
@@ -1763,6 +1768,143 @@ class KafkaController(val config: KafkaConfig,
}
}
+ def alterIsrs(alterIsrRequest: AlterIsrRequestData, callback:
AlterIsrResponseData => Unit): Unit = {
+ val isrsToAlter = mutable.Map[TopicPartition, LeaderAndIsr]()
+
+ alterIsrRequest.topics.forEach { topicReq =>
+ topicReq.partitions.forEach { partitionReq =>
+ val tp = new TopicPartition(topicReq.name, partitionReq.partitionIndex)
+ val newIsr = partitionReq.newIsr().asScala.toList.map(_.toInt)
+ isrsToAlter.put(tp, new LeaderAndIsr(alterIsrRequest.brokerId,
partitionReq.leaderEpoch, newIsr, partitionReq.currentIsrVersion))
+ }
+ }
+
+ def responseCallback(results: Either[Map[TopicPartition, Either[Errors,
LeaderAndIsr]], Errors]): Unit = {
+ val resp = new AlterIsrResponseData()
+ results match {
+ case Right(error) =>
+ resp.setErrorCode(error.code)
+ case Left(partitionResults) =>
+ resp.setTopics(new util.ArrayList())
+ partitionResults.groupBy(_._1.topic).foreach { entry =>
Review comment:
nit: can we avoid using `_1` and _2`? It's a lot easier to follow if
they are named.
##########
File path: core/src/main/scala/kafka/cluster/Partition.scala
##########
@@ -200,9 +241,11 @@ class Partition(val topicPartition: TopicPartition,
// defined when this broker is leader for partition
@volatile private var leaderEpochStartOffsetOpt: Option[Long] = None
@volatile var leaderReplicaIdOpt: Option[Int] = None
- @volatile var inSyncReplicaIds = Set.empty[Int]
+ @volatile var isrState: IsrState = CommittedIsr(Set.empty)
Review comment:
I wonder if we should be exposing this. Would it be enough to have a
`def inSyncReplicaIds = isrState.isr`? One thing we need to be a little careful
of is the fact that we now have a volatile variable with multiple fields. So if
you try to access two fields through the `isrState` reference, you could see
inconsistent data.
##########
File path: core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
##########
@@ -1257,20 +1250,18 @@ class PartitionTest extends AbstractPartitionTest {
// On initialization, the replica is considered caught up and should not
be removed
partition.maybeShrinkIsr()
- assertEquals(Set(brokerId, remoteBrokerId), partition.inSyncReplicaIds)
+ assertEquals(Set(brokerId, remoteBrokerId), partition.isrState.isr)
// If enough time passes without a fetch update, the ISR should shrink
time.sleep(partition.replicaLagTimeMaxMs + 1)
- val updatedLeaderAndIsr = LeaderAndIsr(
- leader = brokerId,
- leaderEpoch = leaderEpoch,
- isr = List(brokerId),
- zkVersion = 1)
- when(stateStore.shrinkIsr(controllerEpoch,
updatedLeaderAndIsr)).thenReturn(Some(2))
+ // Shrink the ISR
partition.maybeShrinkIsr()
- assertEquals(Set(brokerId), partition.inSyncReplicaIds)
- assertEquals(10L, partition.localLogOrException.highWatermark)
+ assertEquals(alterIsrManager.isrUpdates.size, 1)
Review comment:
I may have missed it, but do we have tests which verify error handling?
I see tests which verify requests get sent, but at a quick glance I didn't see
tests of responses.
##########
File path: core/src/test/scala/unit/kafka/utils/TestUtils.scala
##########
@@ -1065,6 +1065,25 @@ object TestUtils extends Logging {
logDirFailureChannel = new
LogDirFailureChannel(logDirs.size))
}
+ class TestAlterIsrManager extends AlterIsrManager {
Review comment:
nit: sort of conventional to use a name like `MockAlterIsrManager`
##########
File path: core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
##########
@@ -1149,31 +1153,27 @@ class PartitionTest extends AbstractPartitionTest {
followerFetchTimeMs = time.milliseconds(),
leaderEndOffset = 6L)
- assertEquals(Set(brokerId), partition.inSyncReplicaIds)
+ assertEquals(Set(brokerId), partition.isrState.isr)
assertEquals(3L, remoteReplica.logEndOffset)
assertEquals(0L, remoteReplica.logStartOffset)
- // The next update should bring the follower back into the ISR
- val updatedLeaderAndIsr = LeaderAndIsr(
- leader = brokerId,
- leaderEpoch = leaderEpoch,
- isr = List(brokerId, remoteBrokerId),
- zkVersion = 1)
- when(stateStore.expandIsr(controllerEpoch,
updatedLeaderAndIsr)).thenReturn(Some(2))
-
partition.updateFollowerFetchState(remoteBrokerId,
followerFetchOffsetMetadata = LogOffsetMetadata(10),
followerStartOffset = 0L,
followerFetchTimeMs = time.milliseconds(),
leaderEndOffset = 6L)
- assertEquals(Set(brokerId, remoteBrokerId), partition.inSyncReplicaIds)
+ assertEquals(alterIsrManager.isrUpdates.size, 1)
+ assertEquals(alterIsrManager.isrUpdates.dequeue().leaderAndIsr.isr,
List(brokerId, remoteBrokerId))
+ assertEquals(Set(brokerId), partition.isrState.isr)
+ assertEquals(Set(brokerId, remoteBrokerId), partition.isrState.maximalIsr)
assertEquals(10L, remoteReplica.logEndOffset)
assertEquals(0L, remoteReplica.logStartOffset)
}
@Test
def testIsrNotExpandedIfUpdateFails(): Unit = {
+ // TODO maybe remove this test now?
Review comment:
Need to address the TODOs in this class.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]