mumrah commented on a change in pull request #9100:
URL: https://github.com/apache/kafka/pull/9100#discussion_r494523773



##########
File path: core/src/main/scala/kafka/cluster/Partition.scala
##########
@@ -1246,6 +1351,51 @@ class Partition(val topicPartition: TopicPartition,
     }
   }
 
+  private def sendAlterIsrRequest(proposedIsrState: IsrState): Boolean = {
+    val isrToSendOpt: Option[Set[Int]] = proposedIsrState match {
+      case PendingExpandIsr(isr, newInSyncReplicaId) => Some(isr + 
newInSyncReplicaId)
+      case PendingShrinkIsr(isr, outOfSyncReplicaIds) => Some(isr -- 
outOfSyncReplicaIds)
+      case CommittedIsr(_) =>
+        error(s"Asked to send AlterIsr but there are no pending updates")
+        None
+    }
+    isrToSendOpt.exists { isrToSend =>
+      val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, 
isrToSend.toList, zkVersion)
+      val callbackPartial = handleAlterIsrResponse(isrToSend, _ : 
Either[Errors, LeaderAndIsr])
+      alterIsrManager.enqueue(AlterIsrItem(topicPartition, newLeaderAndIsr, 
callbackPartial))
+    }
+  }
+
+  private def handleAlterIsrResponse(proposedIsr: Set[Int], result: 
Either[Errors, LeaderAndIsr]): Unit = {
+    inWriteLock(leaderIsrUpdateLock) {
+      result match {
+        case Left(error: Errors) => error match {
+          case Errors.UNKNOWN_TOPIC_OR_PARTITION =>
+            debug(s"Controller failed to update ISR to 
${proposedIsr.mkString(",")} since it doesn't know about this topic or 
partition. Giving up.")
+          case Errors.FENCED_LEADER_EPOCH =>
+            debug(s"Controller failed to update ISR to 
${proposedIsr.mkString(",")} since we sent an old leader epoch. Giving up.")
+          case Errors.INVALID_UPDATE_VERSION =>
+            debug(s"Controller failed to update ISR to 
${proposedIsr.mkString(",")} due to invalid zk version. Retrying.")
+            sendAlterIsrRequest(isrState)
+          case _ =>
+            warn(s"Controller failed to update ISR to 
${proposedIsr.mkString(",")} due to $error. Retrying.")
+            sendAlterIsrRequest(isrState)

Review comment:
       True, we could see a new ISR from controller initiated changes via 
LeaderAndIsr while our request is in-flight. We have a check for this on 
successful responses, but we should also check here. Since our request failed, 
we don't have a leaderEpoch to check against so I think the best we can do is 
see if `isrState` is still pending before re-sending the request




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to