This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 8839514  MINOR: Small cleanups in `AlterIsr` handling logic (#9663)
8839514 is described below

commit 8839514efb9f037462541b8c1957a47ec19a1565
Author: Jason Gustafson <[email protected]>
AuthorDate: Tue Dec 1 11:17:08 2020 -0800

    MINOR: Small cleanups in `AlterIsr` handling logic (#9663)
    
    A few small cleanups in `Partition` handling of `AlterIsr`:
    
    - Factor state update and log message into `sendAlterIsrRequest`
    - Ensure illegal state error gets raised if a retry fails to be enqueued
    - Always check the proposed state against the current state in 
`handleAlterIsrResponse`
    - Add `toString` implementations to `IsrState` case classes
    
    Reviewers: Chia-Ping Tsai <[email protected]>
---
 core/src/main/scala/kafka/cluster/Partition.scala | 100 +++++++++++++---------
 1 file changed, 61 insertions(+), 39 deletions(-)

diff --git a/core/src/main/scala/kafka/cluster/Partition.scala 
b/core/src/main/scala/kafka/cluster/Partition.scala
index 7b5d6b7..f6a9e83 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -181,21 +181,47 @@ sealed trait IsrState {
   def isInflight: Boolean
 }
 
-case class PendingExpandIsr(isr: Set[Int], newInSyncReplicaId: Int) extends 
IsrState {
+case class PendingExpandIsr(
+  isr: Set[Int],
+  newInSyncReplicaId: Int
+) extends IsrState {
   val maximalIsr = isr + newInSyncReplicaId
   val isInflight = true
+
+  override def toString: String = {
+    s"PendingExpandIsr(isr=$isr" +
+      s", newInSyncReplicaId=$newInSyncReplicaId" +
+      ")"
+  }
 }
 
-case class PendingShrinkIsr(isr: Set[Int], outOfSyncReplicaIds: Set[Int]) 
extends IsrState  {
+case class PendingShrinkIsr(
+  isr: Set[Int],
+  outOfSyncReplicaIds: Set[Int]
+) extends IsrState  {
   val maximalIsr = isr
   val isInflight = true
+
+  override def toString: String = {
+    s"PendingShrinkIsr(isr=$isr" +
+      s", outOfSyncReplicaIds=$outOfSyncReplicaIds" +
+      ")"
+  }
 }
 
-case class CommittedIsr(isr: Set[Int]) extends IsrState {
+case class CommittedIsr(
+  isr: Set[Int]
+) extends IsrState {
   val maximalIsr = isr
   val isInflight = false
+
+  override def toString: String = {
+    s"CommittedIsr(isr=$isr" +
+      ")"
+  }
 }
 
+
 /**
  * Data structure that represents a topic partition. The leader maintains the 
AR, ISR, CUR, RAR
  *
@@ -1286,14 +1312,7 @@ class Partition(val topicPartition: TopicPartition,
     if (!isrState.isInflight) {
       // When expanding the ISR, we can safely assume the new replica will 
make it into the ISR since this puts us in
       // a more constrained state for advancing the HW.
-      val proposedIsrState = PendingExpandIsr(isrState.isr, newInSyncReplica)
-      if (sendAlterIsrRequest(proposedIsrState)) {
-        // Only update our ISR state of AlterIsrManager accepts our update
-        debug(s"Adding new in-sync replica $newInSyncReplica. Pending ISR 
updated to [${isrState.maximalIsr.mkString(",")}]")
-        isrState = proposedIsrState
-      } else {
-        throw new IllegalStateException("Failed to enqueue ISR expansion even 
though there was no apparent in-flight ISR changes")
-      }
+      sendAlterIsrRequest(PendingExpandIsr(isrState.isr, newInSyncReplica))
     } else {
       trace(s"ISR update in-flight, not adding new in-sync replica 
$newInSyncReplica")
     }
@@ -1321,13 +1340,7 @@ class Partition(val topicPartition: TopicPartition,
       // When shrinking the ISR, we cannot assume that the update will succeed 
as this could erroneously advance the HW
       // We update pendingInSyncReplicaIds here simply to prevent any further 
ISR updates from occurring until we get
       // the next LeaderAndIsr
-      val proposedIsrState = PendingShrinkIsr(isrState.isr, outOfSyncReplicas)
-      if (sendAlterIsrRequest(proposedIsrState)) {
-        debug(s"Removing out-of-sync replicas $outOfSyncReplicas")
-        isrState = proposedIsrState
-      } else {
-        throw new IllegalStateException("Failed to enqueue ISR shrink even 
though there was no apparent in-flight ISR changes")
-      }
+      sendAlterIsrRequest(PendingShrinkIsr(isrState.isr, outOfSyncReplicas))
     } else {
       trace(s"ISR update in-flight, not removing out-of-sync replicas 
$outOfSyncReplicas")
     }
@@ -1351,19 +1364,24 @@ class Partition(val topicPartition: TopicPartition,
     }
   }
 
-  private def sendAlterIsrRequest(proposedIsrState: IsrState): Boolean = {
-    val isrToSendOpt: Option[Set[Int]] = proposedIsrState match {
-      case PendingExpandIsr(isr, newInSyncReplicaId) => Some(isr + 
newInSyncReplicaId)
-      case PendingShrinkIsr(isr, outOfSyncReplicaIds) => Some(isr -- 
outOfSyncReplicaIds)
-      case CommittedIsr(_) =>
-        error(s"Asked to send AlterIsr but there are no pending updates")
-        None
+  private def sendAlterIsrRequest(proposedIsrState: IsrState): Unit = {
+    val isrToSend: Set[Int] = proposedIsrState match {
+      case PendingExpandIsr(isr, newInSyncReplicaId) => isr + 
newInSyncReplicaId
+      case PendingShrinkIsr(isr, outOfSyncReplicaIds) => isr -- 
outOfSyncReplicaIds
+      case state =>
+        throw new IllegalStateException(s"Invalid state $state for `AlterIsr` 
request for partition $topicPartition")
     }
-    isrToSendOpt.exists { isrToSend =>
-      val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, 
isrToSend.toList, zkVersion)
-      val callbackPartial = handleAlterIsrResponse(isrToSend, _ : 
Either[Errors, LeaderAndIsr])
-      alterIsrManager.enqueue(AlterIsrItem(topicPartition, newLeaderAndIsr, 
callbackPartial))
+
+    val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, 
isrToSend.toList, zkVersion)
+    val alterIsrItem = AlterIsrItem(topicPartition, newLeaderAndIsr, 
handleAlterIsrResponse(proposedIsrState))
+
+    if (!alterIsrManager.enqueue(alterIsrItem)) {
+      throw new IllegalStateException(s"Failed to enqueue `AlterIsr` request 
with state " +
+        s"$newLeaderAndIsr for partition $topicPartition")
     }
+
+    isrState = proposedIsrState
+    debug(s"Sent `AlterIsr` request to change state to $newLeaderAndIsr after 
transition to $proposedIsrState")
   }
 
   /**
@@ -1372,23 +1390,27 @@ class Partition(val topicPartition: TopicPartition,
    * Since our error was non-retryable we are okay staying in this state until 
we see new metadata from UpdateMetadata
    * or LeaderAndIsr
    */
-  private def handleAlterIsrResponse(proposedIsr: Set[Int], result: 
Either[Errors, LeaderAndIsr]): Unit = {
+  private def handleAlterIsrResponse(proposedIsrState: IsrState)(result: 
Either[Errors, LeaderAndIsr]): Unit = {
     inWriteLock(leaderIsrUpdateLock) {
+      if (isrState != proposedIsrState) {
+        // This means isrState was updated through leader election or some 
other mechanism before we got the AlterIsr
+        // response. We don't know what happened on the controller exactly, 
but we do know this response is out of date
+        // so we ignore it.
+        debug(s"Ignoring failed ISR update to $proposedIsrState since we have 
already updated state to $isrState")
+        return
+      }
+
       result match {
         case Left(error: Errors) => error match {
           case Errors.UNKNOWN_TOPIC_OR_PARTITION =>
-            debug(s"Controller failed to update ISR to 
${proposedIsr.mkString(",")} since it doesn't know about this topic or 
partition. Giving up.")
+            debug(s"Controller failed to update ISR to $proposedIsrState since 
it doesn't know about this topic or partition. Giving up.")
           case Errors.FENCED_LEADER_EPOCH =>
-            debug(s"Controller failed to update ISR to 
${proposedIsr.mkString(",")} since we sent an old leader epoch. Giving up.")
+            debug(s"Controller failed to update ISR to $proposedIsrState since 
we sent an old leader epoch. Giving up.")
           case Errors.INVALID_UPDATE_VERSION =>
-            debug(s"Controller failed to update ISR to 
${proposedIsr.mkString(",")} due to invalid zk version. Giving up.")
+            debug(s"Controller failed to update ISR to $proposedIsrState due 
to invalid zk version. Giving up.")
           case _ =>
-            if (isrState.isInflight) {
-              warn(s"Controller failed to update ISR to 
${proposedIsr.mkString(",")} due to $error. Retrying.")
-              sendAlterIsrRequest(isrState)
-            } else {
-              warn(s"Ignoring failed ISR update to 
${proposedIsr.mkString(",")} since due to $error since we have a committed 
ISR.")
-            }
+            warn(s"Controller failed to update ISR to $proposedIsrState due to 
unexpected $error. Retrying.")
+            sendAlterIsrRequest(proposedIsrState)
         }
         case Right(leaderAndIsr: LeaderAndIsr) =>
           // Success from controller, still need to check a few things

Reply via email to