hachikuji commented on a change in pull request #9100:
URL: https://github.com/apache/kafka/pull/9100#discussion_r494480847



##########
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##########
@@ -1764,6 +1769,145 @@ class KafkaController(val config: KafkaConfig,
     }
   }
 
+  def alterIsrs(alterIsrRequest: AlterIsrRequestData, callback: 
AlterIsrResponseData => Unit): Unit = {
+    val isrsToAlter = mutable.Map[TopicPartition, LeaderAndIsr]()
+
+    alterIsrRequest.topics.forEach { topicReq =>
+      topicReq.partitions.forEach { partitionReq =>
+        val tp = new TopicPartition(topicReq.name, partitionReq.partitionIndex)
+        val newIsr = partitionReq.newIsr().asScala.toList.map(_.toInt)
+        isrsToAlter.put(tp, new LeaderAndIsr(alterIsrRequest.brokerId, 
partitionReq.leaderEpoch, newIsr, partitionReq.currentIsrVersion))
+      }
+    }
+
+    def responseCallback(results: Either[Map[TopicPartition, Either[Errors, 
LeaderAndIsr]], Errors]): Unit = {
+      val resp = new AlterIsrResponseData()
+      results match {
+        case Right(error) =>
+          resp.setErrorCode(error.code)
+        case Left(partitionResults) =>
+          resp.setTopics(new util.ArrayList())
+          partitionResults
+            .groupBy { case (tp, _) => tp.topic }   // Group by topic
+            .foreach { case (topic, partitions) =>
+              // Add each topic part to the response
+              val topicResp = new AlterIsrResponseData.TopicData()
+                .setName(topic)
+                .setPartitions(new util.ArrayList())
+              resp.topics.add(topicResp)
+              partitions.foreach { case (tp, errorOrIsr) =>
+                // Add each partition part to the response (new ISR or error)
+                errorOrIsr match {
+                  case Left(error) => topicResp.partitions.add(
+                    new AlterIsrResponseData.PartitionData()
+                      .setPartitionIndex(tp.partition)
+                      .setErrorCode(error.code))
+                  case Right(leaderAndIsr) => topicResp.partitions.add(
+                    new AlterIsrResponseData.PartitionData()
+                      .setPartitionIndex(tp.partition)
+                      .setLeaderId(leaderAndIsr.leader)
+                      .setLeaderEpoch(leaderAndIsr.leaderEpoch)
+                      .setIsr(leaderAndIsr.isr.map(Integer.valueOf).asJava)
+                      .setCurrentIsrVersion(leaderAndIsr.zkVersion))
+                }
+            }
+          }
+      }
+      callback.apply(resp)
+    }
+
+    eventManager.put(AlterIsrReceived(alterIsrRequest.brokerId, 
alterIsrRequest.brokerEpoch, isrsToAlter, responseCallback))
+  }
+
+  private def processAlterIsr(brokerId: Int, brokerEpoch: Long, isrsToAlter: 
Map[TopicPartition, LeaderAndIsr],
+                              callback: AlterIsrCallback): Unit = {
+
+    // Handle a few short-circuits
+    if (!isActive) {
+      callback.apply(Right(Errors.NOT_CONTROLLER))
+      return
+    }
+
+    val brokerEpochOpt = controllerContext.liveBrokerIdAndEpochs.get(brokerId)
+    if (brokerEpochOpt.isEmpty) {
+      info(s"Ignoring AlterIsr due to unknown broker $brokerId")
+      callback.apply(Right(Errors.STALE_BROKER_EPOCH))
+      return
+    }
+
+    if (!brokerEpochOpt.contains(brokerEpoch)) {
+      info(s"Ignoring AlterIsr due to stale broker epoch $brokerEpoch for 
broker $brokerId")
+      callback.apply(Right(Errors.STALE_BROKER_EPOCH))
+      return
+    }
+
+    val response = try {
+      val partitionResponses = mutable.HashMap[TopicPartition, Either[Errors, 
LeaderAndIsr]]()
+
+      // Determine which partitions we will accept the new ISR for
+      val adjustedIsrs: Map[TopicPartition, LeaderAndIsr] = 
isrsToAlter.flatMap {
+        case (tp: TopicPartition, newLeaderAndIsr: LeaderAndIsr) =>
+          val partitionError: Errors = 
controllerContext.partitionLeadershipInfo(tp) match {
+            case Some(leaderIsrAndControllerEpoch) =>
+              val currentLeaderAndIsr = 
leaderIsrAndControllerEpoch.leaderAndIsr
+              if (newLeaderAndIsr.leaderEpoch < 
currentLeaderAndIsr.leaderEpoch) {

Review comment:
       Consider the following scenario:
   
   1) broker sends AlterIsr
   2) the update succeeds but the response is lost
   3) broker retries AlterIsr
   
   Currently the leader will be stuck after 3) because it has no way to get the 
latest LeaderAndIsr state if the first attempt fails. To handle this, I think 
we need to add an idempotence check here. After we have validated the leader 
epoch, if the intended state matches the current state, then we can just return 
the current state. 

##########
File path: core/src/main/scala/kafka/cluster/Partition.scala
##########
@@ -1246,6 +1351,51 @@ class Partition(val topicPartition: TopicPartition,
     }
   }
 
+  private def sendAlterIsrRequest(proposedIsrState: IsrState): Boolean = {
+    val isrToSendOpt: Option[Set[Int]] = proposedIsrState match {
+      case PendingExpandIsr(isr, newInSyncReplicaId) => Some(isr + 
newInSyncReplicaId)
+      case PendingShrinkIsr(isr, outOfSyncReplicaIds) => Some(isr -- 
outOfSyncReplicaIds)
+      case CommittedIsr(_) =>
+        error(s"Asked to send AlterIsr but there are no pending updates")
+        None
+    }
+    isrToSendOpt.exists { isrToSend =>
+      val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, 
isrToSend.toList, zkVersion)
+      val callbackPartial = handleAlterIsrResponse(isrToSend, _ : 
Either[Errors, LeaderAndIsr])
+      alterIsrManager.enqueue(AlterIsrItem(topicPartition, newLeaderAndIsr, 
callbackPartial))
+    }
+  }
+
+  private def handleAlterIsrResponse(proposedIsr: Set[Int], result: 
Either[Errors, LeaderAndIsr]): Unit = {
+    inWriteLock(leaderIsrUpdateLock) {
+      result match {
+        case Left(error: Errors) => error match {
+          case Errors.UNKNOWN_TOPIC_OR_PARTITION =>
+            debug(s"Controller failed to update ISR to 
${proposedIsr.mkString(",")} since it doesn't know about this topic or 
partition. Giving up.")
+          case Errors.FENCED_LEADER_EPOCH =>
+            debug(s"Controller failed to update ISR to 
${proposedIsr.mkString(",")} since we sent an old leader epoch. Giving up.")
+          case Errors.INVALID_UPDATE_VERSION =>
+            debug(s"Controller failed to update ISR to 
${proposedIsr.mkString(",")} due to invalid zk version. Retrying.")
+            sendAlterIsrRequest(isrState)

Review comment:
       It might make more sense to handle this case similarly to 
FENCED_LEADER_EPOCH. Retrying won't help since we know our version will be 
rejected Come to think of it, this would be kind of a strange error to hit in 
the current implementation which only allows one request inflight at a time. 
For controller-initiated changes, we'd expect to hit FENCED_LEADER_EPOCH. 
Anyway, I think it's still worth keeping the error.

##########
File path: core/src/main/scala/kafka/cluster/Partition.scala
##########
@@ -1246,6 +1351,51 @@ class Partition(val topicPartition: TopicPartition,
     }
   }
 
+  private def sendAlterIsrRequest(proposedIsrState: IsrState): Boolean = {
+    val isrToSendOpt: Option[Set[Int]] = proposedIsrState match {
+      case PendingExpandIsr(isr, newInSyncReplicaId) => Some(isr + 
newInSyncReplicaId)
+      case PendingShrinkIsr(isr, outOfSyncReplicaIds) => Some(isr -- 
outOfSyncReplicaIds)
+      case CommittedIsr(_) =>
+        error(s"Asked to send AlterIsr but there are no pending updates")
+        None
+    }
+    isrToSendOpt.exists { isrToSend =>
+      val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, 
isrToSend.toList, zkVersion)
+      val callbackPartial = handleAlterIsrResponse(isrToSend, _ : 
Either[Errors, LeaderAndIsr])
+      alterIsrManager.enqueue(AlterIsrItem(topicPartition, newLeaderAndIsr, 
callbackPartial))
+    }
+  }
+
+  private def handleAlterIsrResponse(proposedIsr: Set[Int], result: 
Either[Errors, LeaderAndIsr]): Unit = {
+    inWriteLock(leaderIsrUpdateLock) {
+      result match {
+        case Left(error: Errors) => error match {
+          case Errors.UNKNOWN_TOPIC_OR_PARTITION =>
+            debug(s"Controller failed to update ISR to 
${proposedIsr.mkString(",")} since it doesn't know about this topic or 
partition. Giving up.")
+          case Errors.FENCED_LEADER_EPOCH =>
+            debug(s"Controller failed to update ISR to 
${proposedIsr.mkString(",")} since we sent an old leader epoch. Giving up.")
+          case Errors.INVALID_UPDATE_VERSION =>
+            debug(s"Controller failed to update ISR to 
${proposedIsr.mkString(",")} due to invalid zk version. Retrying.")
+            sendAlterIsrRequest(isrState)
+          case _ =>
+            warn(s"Controller failed to update ISR to 
${proposedIsr.mkString(",")} due to $error. Retrying.")
+            sendAlterIsrRequest(isrState)

Review comment:
       Is there any way that we could end up retrying after the pending ISR 
state had already been reset? I know we have `AlterIsrManager.clearPending`, 
but that only removes the partition from the unsent queue. How do we handle 
inflight `AlterIsr` requests after the state has been reset. Seems like it 
might be worth adding a check here to validate whether the request is still 
needed.

##########
File path: core/src/main/scala/kafka/cluster/Partition.scala
##########
@@ -1366,6 +1366,12 @@ class Partition(val topicPartition: TopicPartition,
     }
   }
 
+  /**
+   * This is called for each partition in the body of an AlterIsr response. 
For errors which are non-retryable we simply

Review comment:
       nit: conventionally we prefer "retriable"

##########
File path: core/src/main/scala/kafka/cluster/Partition.scala
##########
@@ -1375,11 +1381,14 @@ class Partition(val topicPartition: TopicPartition,
           case Errors.FENCED_LEADER_EPOCH =>
             debug(s"Controller failed to update ISR to 
${proposedIsr.mkString(",")} since we sent an old leader epoch. Giving up.")
           case Errors.INVALID_UPDATE_VERSION =>
-            debug(s"Controller failed to update ISR to 
${proposedIsr.mkString(",")} due to invalid zk version. Retrying.")
-            sendAlterIsrRequest(isrState)
+            debug(s"Controller failed to update ISR to 
${proposedIsr.mkString(",")} due to invalid zk version. Giving up.")
           case _ =>
-            warn(s"Controller failed to update ISR to 
${proposedIsr.mkString(",")} due to $error. Retrying.")
-            sendAlterIsrRequest(isrState)
+            if (isrState.isInflight) {
+              warn(s"Controller failed to update ISR to 
${proposedIsr.mkString(",")} due to $error. Retrying.")
+              sendAlterIsrRequest(isrState)
+            } else {
+              warn(s"Ignoring failed ISR update to 
${proposedIsr.mkString(",")} since due to $error since we have a committed 
ISR.")

Review comment:
       nit (for follow-up): fix grammar "since due"

##########
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##########
@@ -1847,21 +1847,22 @@ class KafkaController(val config: KafkaConfig,
       // Determine which partitions we will accept the new ISR for
       val adjustedIsrs: Map[TopicPartition, LeaderAndIsr] = 
isrsToAlter.flatMap {
         case (tp: TopicPartition, newLeaderAndIsr: LeaderAndIsr) =>
-          val partitionError: Errors = 
controllerContext.partitionLeadershipInfo(tp) match {
+          controllerContext.partitionLeadershipInfo(tp) match {
             case Some(leaderIsrAndControllerEpoch) =>
               val currentLeaderAndIsr = 
leaderIsrAndControllerEpoch.leaderAndIsr
               if (newLeaderAndIsr.leaderEpoch < 
currentLeaderAndIsr.leaderEpoch) {
-                Errors.FENCED_LEADER_EPOCH
+                partitionResponses(tp) = Left(Errors.FENCED_LEADER_EPOCH)
+                None
+              } else if (newLeaderAndIsr.equalsIgnoreZk(currentLeaderAndIsr)) {
+                // If a partition is already in the desired state, just return 
it

Review comment:
       It might be worth mentioning that this could happen in the case of a 
retry after a successful update.

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -3066,20 +3066,18 @@ class KafkaApis(val requestChannel: RequestChannel,
   def handleAlterIsrRequest(request: RequestChannel.Request): Unit = {
     val alterIsrRequest = request.body[AlterIsrRequest]
 
-    if (authorize(request.context, CLUSTER_ACTION, CLUSTER, CLUSTER_NAME)) {
-      if (!controller.isActive) {
-        sendResponseMaybeThrottle(request, requestThrottleMs =>
-          alterIsrRequest.getErrorResponse(requestThrottleMs, 
Errors.NOT_CONTROLLER.exception()))
-      } else {
-        controller.alterIsrs(alterIsrRequest.data,
-          alterIsrResp => sendResponseMaybeThrottle(request, requestThrottleMs 
=>
-            new 
AlterIsrResponse(alterIsrResp.setThrottleTimeMs(requestThrottleMs))
-          )
-        )
-      }
-    } else {
+    if (!authorize(request.context, CLUSTER_ACTION, CLUSTER, CLUSTER_NAME)) {
       sendResponseMaybeThrottle(request, requestThrottleMs =>
         alterIsrRequest.getErrorResponse(requestThrottleMs, 
Errors.CLUSTER_AUTHORIZATION_FAILED.exception))
+    } else if (!controller.isActive) {
+      sendResponseMaybeThrottle(request, requestThrottleMs =>
+        alterIsrRequest.getErrorResponse(requestThrottleMs, 
Errors.NOT_CONTROLLER.exception()))

Review comment:
       nit: leave off parenthesis after `exception`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to