This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 8839514 MINOR: Small cleanups in `AlterIsr` handling logic (#9663)
8839514 is described below
commit 8839514efb9f037462541b8c1957a47ec19a1565
Author: Jason Gustafson <[email protected]>
AuthorDate: Tue Dec 1 11:17:08 2020 -0800
MINOR: Small cleanups in `AlterIsr` handling logic (#9663)
A few small cleanups in `Partition` handling of `AlterIsr`:
- Factor state update and log message into `sendAlterIsrRequest`
- Ensure illegal state error gets raised if a retry fails to be enqueued
- Always check the proposed state against the current state in
`handleAlterIsrResponse`
- Add `toString` implementations to `IsrState` case classes
Reviewers: Chia-Ping Tsai <[email protected]>
---
core/src/main/scala/kafka/cluster/Partition.scala | 100 +++++++++++++---------
1 file changed, 61 insertions(+), 39 deletions(-)
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala
b/core/src/main/scala/kafka/cluster/Partition.scala
index 7b5d6b7..f6a9e83 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -181,21 +181,47 @@ sealed trait IsrState {
def isInflight: Boolean
}
-case class PendingExpandIsr(isr: Set[Int], newInSyncReplicaId: Int) extends
IsrState {
+case class PendingExpandIsr(
+ isr: Set[Int],
+ newInSyncReplicaId: Int
+) extends IsrState {
val maximalIsr = isr + newInSyncReplicaId
val isInflight = true
+
+ override def toString: String = {
+ s"PendingExpandIsr(isr=$isr" +
+ s", newInSyncReplicaId=$newInSyncReplicaId" +
+ ")"
+ }
}
-case class PendingShrinkIsr(isr: Set[Int], outOfSyncReplicaIds: Set[Int])
extends IsrState {
+case class PendingShrinkIsr(
+ isr: Set[Int],
+ outOfSyncReplicaIds: Set[Int]
+) extends IsrState {
val maximalIsr = isr
val isInflight = true
+
+ override def toString: String = {
+ s"PendingShrinkIsr(isr=$isr" +
+ s", outOfSyncReplicaIds=$outOfSyncReplicaIds" +
+ ")"
+ }
}
-case class CommittedIsr(isr: Set[Int]) extends IsrState {
+case class CommittedIsr(
+ isr: Set[Int]
+) extends IsrState {
val maximalIsr = isr
val isInflight = false
+
+ override def toString: String = {
+ s"CommittedIsr(isr=$isr" +
+ ")"
+ }
}
+
/**
* Data structure that represents a topic partition. The leader maintains the
AR, ISR, CUR, RAR
*
@@ -1286,14 +1312,7 @@ class Partition(val topicPartition: TopicPartition,
if (!isrState.isInflight) {
// When expanding the ISR, we can safely assume the new replica will
make it into the ISR since this puts us in
// a more constrained state for advancing the HW.
- val proposedIsrState = PendingExpandIsr(isrState.isr, newInSyncReplica)
- if (sendAlterIsrRequest(proposedIsrState)) {
- // Only update our ISR state of AlterIsrManager accepts our update
- debug(s"Adding new in-sync replica $newInSyncReplica. Pending ISR
updated to [${isrState.maximalIsr.mkString(",")}]")
- isrState = proposedIsrState
- } else {
- throw new IllegalStateException("Failed to enqueue ISR expansion even
though there was no apparent in-flight ISR changes")
- }
+ sendAlterIsrRequest(PendingExpandIsr(isrState.isr, newInSyncReplica))
} else {
trace(s"ISR update in-flight, not adding new in-sync replica
$newInSyncReplica")
}
@@ -1321,13 +1340,7 @@ class Partition(val topicPartition: TopicPartition,
// When shrinking the ISR, we cannot assume that the update will succeed
as this could erroneously advance the HW
// We update pendingInSyncReplicaIds here simply to prevent any further
ISR updates from occurring until we get
// the next LeaderAndIsr
- val proposedIsrState = PendingShrinkIsr(isrState.isr, outOfSyncReplicas)
- if (sendAlterIsrRequest(proposedIsrState)) {
- debug(s"Removing out-of-sync replicas $outOfSyncReplicas")
- isrState = proposedIsrState
- } else {
- throw new IllegalStateException("Failed to enqueue ISR shrink even
though there was no apparent in-flight ISR changes")
- }
+ sendAlterIsrRequest(PendingShrinkIsr(isrState.isr, outOfSyncReplicas))
} else {
trace(s"ISR update in-flight, not removing out-of-sync replicas
$outOfSyncReplicas")
}
@@ -1351,19 +1364,24 @@ class Partition(val topicPartition: TopicPartition,
}
}
- private def sendAlterIsrRequest(proposedIsrState: IsrState): Boolean = {
- val isrToSendOpt: Option[Set[Int]] = proposedIsrState match {
- case PendingExpandIsr(isr, newInSyncReplicaId) => Some(isr +
newInSyncReplicaId)
- case PendingShrinkIsr(isr, outOfSyncReplicaIds) => Some(isr --
outOfSyncReplicaIds)
- case CommittedIsr(_) =>
- error(s"Asked to send AlterIsr but there are no pending updates")
- None
+ private def sendAlterIsrRequest(proposedIsrState: IsrState): Unit = {
+ val isrToSend: Set[Int] = proposedIsrState match {
+ case PendingExpandIsr(isr, newInSyncReplicaId) => isr +
newInSyncReplicaId
+ case PendingShrinkIsr(isr, outOfSyncReplicaIds) => isr --
outOfSyncReplicaIds
+ case state =>
+ throw new IllegalStateException(s"Invalid state $state for `AlterIsr`
request for partition $topicPartition")
}
- isrToSendOpt.exists { isrToSend =>
- val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch,
isrToSend.toList, zkVersion)
- val callbackPartial = handleAlterIsrResponse(isrToSend, _ :
Either[Errors, LeaderAndIsr])
- alterIsrManager.enqueue(AlterIsrItem(topicPartition, newLeaderAndIsr,
callbackPartial))
+
+ val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch,
isrToSend.toList, zkVersion)
+ val alterIsrItem = AlterIsrItem(topicPartition, newLeaderAndIsr,
handleAlterIsrResponse(proposedIsrState))
+
+ if (!alterIsrManager.enqueue(alterIsrItem)) {
+ throw new IllegalStateException(s"Failed to enqueue `AlterIsr` request
with state " +
+ s"$newLeaderAndIsr for partition $topicPartition")
}
+
+ isrState = proposedIsrState
+ debug(s"Sent `AlterIsr` request to change state to $newLeaderAndIsr after
transition to $proposedIsrState")
}
/**
@@ -1372,23 +1390,27 @@ class Partition(val topicPartition: TopicPartition,
* Since our error was non-retryable we are okay staying in this state until
we see new metadata from UpdateMetadata
* or LeaderAndIsr
*/
- private def handleAlterIsrResponse(proposedIsr: Set[Int], result:
Either[Errors, LeaderAndIsr]): Unit = {
+ private def handleAlterIsrResponse(proposedIsrState: IsrState)(result:
Either[Errors, LeaderAndIsr]): Unit = {
inWriteLock(leaderIsrUpdateLock) {
+ if (isrState != proposedIsrState) {
+ // This means isrState was updated through leader election or some
other mechanism before we got the AlterIsr
+ // response. We don't know what happened on the controller exactly,
but we do know this response is out of date
+ // so we ignore it.
+ debug(s"Ignoring failed ISR update to $proposedIsrState since we have
already updated state to $isrState")
+ return
+ }
+
result match {
case Left(error: Errors) => error match {
case Errors.UNKNOWN_TOPIC_OR_PARTITION =>
- debug(s"Controller failed to update ISR to
${proposedIsr.mkString(",")} since it doesn't know about this topic or
partition. Giving up.")
+ debug(s"Controller failed to update ISR to $proposedIsrState since
it doesn't know about this topic or partition. Giving up.")
case Errors.FENCED_LEADER_EPOCH =>
- debug(s"Controller failed to update ISR to
${proposedIsr.mkString(",")} since we sent an old leader epoch. Giving up.")
+ debug(s"Controller failed to update ISR to $proposedIsrState since
we sent an old leader epoch. Giving up.")
case Errors.INVALID_UPDATE_VERSION =>
- debug(s"Controller failed to update ISR to
${proposedIsr.mkString(",")} due to invalid zk version. Giving up.")
+ debug(s"Controller failed to update ISR to $proposedIsrState due
to invalid zk version. Giving up.")
case _ =>
- if (isrState.isInflight) {
- warn(s"Controller failed to update ISR to
${proposedIsr.mkString(",")} due to $error. Retrying.")
- sendAlterIsrRequest(isrState)
- } else {
- warn(s"Ignoring failed ISR update to
${proposedIsr.mkString(",")} since due to $error since we have a committed
ISR.")
- }
+ warn(s"Controller failed to update ISR to $proposedIsrState due to
unexpected $error. Retrying.")
+ sendAlterIsrRequest(proposedIsrState)
}
case Right(leaderAndIsr: LeaderAndIsr) =>
// Success from controller, still need to check a few things