mumrah commented on a change in pull request #9100:
URL: https://github.com/apache/kafka/pull/9100#discussion_r494520245



##########
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##########
@@ -1764,6 +1769,145 @@ class KafkaController(val config: KafkaConfig,
     }
   }
 
+  def alterIsrs(alterIsrRequest: AlterIsrRequestData, callback: 
AlterIsrResponseData => Unit): Unit = {
+    val isrsToAlter = mutable.Map[TopicPartition, LeaderAndIsr]()
+
+    alterIsrRequest.topics.forEach { topicReq =>
+      topicReq.partitions.forEach { partitionReq =>
+        val tp = new TopicPartition(topicReq.name, partitionReq.partitionIndex)
+        val newIsr = partitionReq.newIsr().asScala.toList.map(_.toInt)
+        isrsToAlter.put(tp, new LeaderAndIsr(alterIsrRequest.brokerId, 
partitionReq.leaderEpoch, newIsr, partitionReq.currentIsrVersion))
+      }
+    }
+
+    def responseCallback(results: Either[Map[TopicPartition, Either[Errors, 
LeaderAndIsr]], Errors]): Unit = {
+      val resp = new AlterIsrResponseData()
+      results match {
+        case Right(error) =>
+          resp.setErrorCode(error.code)
+        case Left(partitionResults) =>
+          resp.setTopics(new util.ArrayList())
+          partitionResults
+            .groupBy { case (tp, _) => tp.topic }   // Group by topic
+            .foreach { case (topic, partitions) =>
+              // Add each topic part to the response
+              val topicResp = new AlterIsrResponseData.TopicData()
+                .setName(topic)
+                .setPartitions(new util.ArrayList())
+              resp.topics.add(topicResp)
+              partitions.foreach { case (tp, errorOrIsr) =>
+                // Add each partition part to the response (new ISR or error)
+                errorOrIsr match {
+                  case Left(error) => topicResp.partitions.add(
+                    new AlterIsrResponseData.PartitionData()
+                      .setPartitionIndex(tp.partition)
+                      .setErrorCode(error.code))
+                  case Right(leaderAndIsr) => topicResp.partitions.add(
+                    new AlterIsrResponseData.PartitionData()
+                      .setPartitionIndex(tp.partition)
+                      .setLeaderId(leaderAndIsr.leader)
+                      .setLeaderEpoch(leaderAndIsr.leaderEpoch)
+                      .setIsr(leaderAndIsr.isr.map(Integer.valueOf).asJava)
+                      .setCurrentIsrVersion(leaderAndIsr.zkVersion))
+                }
+            }
+          }
+      }
+      callback.apply(resp)
+    }
+
+    eventManager.put(AlterIsrReceived(alterIsrRequest.brokerId, 
alterIsrRequest.brokerEpoch, isrsToAlter, responseCallback))
+  }
+
+  private def processAlterIsr(brokerId: Int, brokerEpoch: Long, isrsToAlter: 
Map[TopicPartition, LeaderAndIsr],
+                              callback: AlterIsrCallback): Unit = {
+
+    // Handle a few short-circuits
+    if (!isActive) {
+      callback.apply(Right(Errors.NOT_CONTROLLER))
+      return
+    }
+
+    val brokerEpochOpt = controllerContext.liveBrokerIdAndEpochs.get(brokerId)
+    if (brokerEpochOpt.isEmpty) {
+      info(s"Ignoring AlterIsr due to unknown broker $brokerId")
+      callback.apply(Right(Errors.STALE_BROKER_EPOCH))
+      return
+    }
+
+    if (!brokerEpochOpt.contains(brokerEpoch)) {
+      info(s"Ignoring AlterIsr due to stale broker epoch $brokerEpoch for 
broker $brokerId")
+      callback.apply(Right(Errors.STALE_BROKER_EPOCH))
+      return
+    }
+
+    val response = try {
+      val partitionResponses = mutable.HashMap[TopicPartition, Either[Errors, 
LeaderAndIsr]]()
+
+      // Determine which partitions we will accept the new ISR for
+      val adjustedIsrs: Map[TopicPartition, LeaderAndIsr] = 
isrsToAlter.flatMap {
+        case (tp: TopicPartition, newLeaderAndIsr: LeaderAndIsr) =>
+          val partitionError: Errors = 
controllerContext.partitionLeadershipInfo(tp) match {
+            case Some(leaderIsrAndControllerEpoch) =>
+              val currentLeaderAndIsr = 
leaderIsrAndControllerEpoch.leaderAndIsr
+              if (newLeaderAndIsr.leaderEpoch < 
currentLeaderAndIsr.leaderEpoch) {

Review comment:
       I was trying to think some kind of race with a zombie leader trying to 
update the ISR, however this would get fenced by the leader epoch. This should 
be pretty easy to add

##########
File path: core/src/main/scala/kafka/cluster/Partition.scala
##########
@@ -1246,6 +1351,51 @@ class Partition(val topicPartition: TopicPartition,
     }
   }
 
+  private def sendAlterIsrRequest(proposedIsrState: IsrState): Boolean = {
+    val isrToSendOpt: Option[Set[Int]] = proposedIsrState match {
+      case PendingExpandIsr(isr, newInSyncReplicaId) => Some(isr + 
newInSyncReplicaId)
+      case PendingShrinkIsr(isr, outOfSyncReplicaIds) => Some(isr -- 
outOfSyncReplicaIds)
+      case CommittedIsr(_) =>
+        error(s"Asked to send AlterIsr but there are no pending updates")
+        None
+    }
+    isrToSendOpt.exists { isrToSend =>
+      val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, 
isrToSend.toList, zkVersion)
+      val callbackPartial = handleAlterIsrResponse(isrToSend, _ : 
Either[Errors, LeaderAndIsr])
+      alterIsrManager.enqueue(AlterIsrItem(topicPartition, newLeaderAndIsr, 
callbackPartial))
+    }
+  }
+
+  private def handleAlterIsrResponse(proposedIsr: Set[Int], result: 
Either[Errors, LeaderAndIsr]): Unit = {
+    inWriteLock(leaderIsrUpdateLock) {
+      result match {
+        case Left(error: Errors) => error match {
+          case Errors.UNKNOWN_TOPIC_OR_PARTITION =>
+            debug(s"Controller failed to update ISR to 
${proposedIsr.mkString(",")} since it doesn't know about this topic or 
partition. Giving up.")
+          case Errors.FENCED_LEADER_EPOCH =>
+            debug(s"Controller failed to update ISR to 
${proposedIsr.mkString(",")} since we sent an old leader epoch. Giving up.")
+          case Errors.INVALID_UPDATE_VERSION =>
+            debug(s"Controller failed to update ISR to 
${proposedIsr.mkString(",")} due to invalid zk version. Retrying.")
+            sendAlterIsrRequest(isrState)
+          case _ =>
+            warn(s"Controller failed to update ISR to 
${proposedIsr.mkString(",")} due to $error. Retrying.")
+            sendAlterIsrRequest(isrState)

Review comment:
       True, we could see a new ISR from controller initiated changes via 
LeaderAndIsr while our request is in-flight. We have a check for this on 
successful responses, but we should also check here. Since our request failed, 
we don't have a leaderEpoch to check against so I think the best we can do is 
see if `isrState` is still pending before re-sending the request




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to