This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new b485f92647 KAFKA-13790; ReplicaManager should be robust to all
partition updates from kraft metadata log (#12085)
b485f92647 is described below
commit b485f92647faecc3594bdf4164999d52c859b1bb
Author: David Jacot <[email protected]>
AuthorDate: Mon May 9 20:47:14 2022 +0200
KAFKA-13790; ReplicaManager should be robust to all partition updates from
kraft metadata log (#12085)
This patch refactors the `Partition.makeLeader` and
`Partition.makeFollower` to be robust to all partition updates from the KRaft
metadata log. Particularly, it ensures the following invariants:
- A partition update is accepted if the partition epoch is equal or newer.
The partition epoch is updated by the AlterPartition path as well so we accept
an update from the metadata log with the same partition epoch in order to fully
update the partition state.
- The leader epoch state offset is only updated when the leader epoch is
bumped.
- The follower states are only updated when the leader epoch is bumped.
- Fetchers are only restarted when the leader epoch is bumped. This was
already the case but this patch adds unit tests to prove/maintain it.
In the mean time, the patch unifies the state change logs to be similar in
both ZK and KRaft world.
Reviewers: Jason Gustafson <[email protected]>
---
core/src/main/scala/kafka/cluster/Partition.scala | 143 ++++++++------
.../main/scala/kafka/server/ReplicaManager.scala | 26 +--
.../scala/unit/kafka/cluster/PartitionTest.scala | 201 ++++++++++++++++++-
.../unit/kafka/server/ReplicaManagerTest.scala | 218 ++++++++++++++++++++-
4 files changed, 510 insertions(+), 78 deletions(-)
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala
b/core/src/main/scala/kafka/cluster/Partition.scala
index 126aa71e7c..907c0e3397 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -257,7 +257,7 @@ class Partition(val topicPartition: TopicPartition,
@volatile private var leaderEpoch: Int = LeaderAndIsr.InitialLeaderEpoch - 1
// start offset for 'leaderEpoch' above (leader epoch of the current leader
for this partition),
// defined when this broker is leader for partition
- @volatile private var leaderEpochStartOffsetOpt: Option[Long] = None
+ @volatile private[cluster] var leaderEpochStartOffsetOpt: Option[Long] = None
// Replica ID of the leader, defined when this broker is leader or follower
for the partition.
@volatile var leaderReplicaIdOpt: Option[Int] = None
@volatile private[cluster] var partitionState: PartitionState =
CommittedPartitionState(Set.empty, LeaderRecoveryState.RECOVERED)
@@ -548,21 +548,35 @@ class Partition(val topicPartition: TopicPartition,
highWatermarkCheckpoints: OffsetCheckpoints,
topicId: Option[Uuid]): Boolean = {
val (leaderHWIncremented, isNewLeader) = inWriteLock(leaderIsrUpdateLock) {
- // record the epoch of the controller that made the leadership decision.
This is useful while updating the isr
- // to maintain the decision maker controller's epoch in the zookeeper
path
+ // Partition state changes are expected to have an partition epoch
larger or equal
+ // to the current partition epoch. The latter is allowed because the
partition epoch
+ // is also updated by the AlterPartition response so the new epoch might
be known
+ // before a LeaderAndIsr request is received or before an update is
received via
+ // the metadata log.
+ if (partitionState.partitionEpoch < partitionEpoch) {
+ stateChangeLogger.info(s"Skipped the become-leader state change for
$topicPartition with topic id $topicId " +
+ s"and partition state $partitionState since the leader is already at
a newer partition epoch $partitionEpoch.")
+ return false
+ }
+
+ // Record the epoch of the controller that made the leadership decision.
This is useful while updating the isr
+ // to maintain the decision maker controller's epoch in the zookeeper
path.
controllerEpoch = partitionState.controllerEpoch
+ val currentTimeMs = time.milliseconds
+ val isNewLeader = !isLeader
+ val isNewLeaderEpoch = partitionState.leaderEpoch > leaderEpoch
val isr = partitionState.isr.asScala.map(_.toInt).toSet
val addingReplicas = partitionState.addingReplicas.asScala.map(_.toInt)
val removingReplicas =
partitionState.removingReplicas.asScala.map(_.toInt)
- if (partitionState.leaderRecoveryState ==
LeaderRecoveryState.RECOVERING.value()) {
- stateChangeLogger.info(
- s"The topic partition $topicPartition was marked as RECOVERING.
Leader log recovery is not implemented. " +
- "Marking the topic partition as RECOVERED."
- )
+ if (partitionState.leaderRecoveryState ==
LeaderRecoveryState.RECOVERING.value) {
+ stateChangeLogger.info(s"The topic partition $topicPartition was
marked as RECOVERING. " +
+ "Marking the topic partition as RECOVERED.")
}
+ // Updating the assignment and ISR state is safe if the partition epoch
is
+ // larger or equal to the current partition epoch.
updateAssignmentAndIsr(
assignment = partitionState.replicas.asScala.map(_.toInt),
isr = isr,
@@ -576,51 +590,60 @@ class Partition(val topicPartition: TopicPartition,
} catch {
case e: ZooKeeperClientException =>
stateChangeLogger.error(s"A ZooKeeper client exception has occurred
and makeLeader will be skipping the " +
- s"state change for the partition $topicPartition with leader
epoch: $leaderEpoch ", e)
-
+ s"state change for the partition $topicPartition with leader
epoch: $leaderEpoch.", e)
return false
}
val leaderLog = localLogOrException
- val leaderEpochStartOffset = leaderLog.logEndOffset
- stateChangeLogger.info(s"Leader $topicPartition starts at leader epoch
${partitionState.leaderEpoch} from " +
- s"offset $leaderEpochStartOffset with high watermark
${leaderLog.highWatermark} " +
- s"ISR ${isr.mkString("[", ",", "]")} addingReplicas
${addingReplicas.mkString("[", ",", "]")} " +
- s"removingReplicas ${removingReplicas.mkString("[", ",", "]")}.
Previous leader epoch was $leaderEpoch.")
- // We cache the leader epoch here, persisting it only if it's local
(hence having a log dir)
- leaderEpoch = partitionState.leaderEpoch
- leaderEpochStartOffsetOpt = Some(leaderEpochStartOffset)
- partitionEpoch = partitionState.partitionEpoch
-
- // In the case of successive leader elections in a short time period, a
follower may have
- // entries in its log from a later epoch than any entry in the new
leader's log. In order
- // to ensure that these followers can truncate to the right offset, we
must cache the new
- // leader epoch and the start offset since it should be larger than any
epoch that a follower
- // would try to query.
- leaderLog.maybeAssignEpochStartOffset(leaderEpoch,
leaderEpochStartOffset)
-
- val isNewLeader = !isLeader
- val currentTimeMs = time.milliseconds
+ // We update the epoch start offset and the replicas' state only if the
leader epoch
+ // has changed.
+ if (isNewLeaderEpoch) {
+ val leaderEpochStartOffset = leaderLog.logEndOffset
+ stateChangeLogger.info(s"Leader $topicPartition with topic id $topicId
starts at " +
+ s"leader epoch ${partitionState.leaderEpoch} from offset
$leaderEpochStartOffset " +
+ s"with partition epoch ${partitionState.partitionEpoch}, high
watermark ${leaderLog.highWatermark}, " +
+ s"ISR ${isr.mkString("[", ",", "]")}, adding replicas
${addingReplicas.mkString("[", ",", "]")} and " +
+ s"removing replicas ${removingReplicas.mkString("[", ",", "]")}.
Previous leader epoch was $leaderEpoch.")
+
+ // In the case of successive leader elections in a short time period,
a follower may have
+ // entries in its log from a later epoch than any entry in the new
leader's log. In order
+ // to ensure that these followers can truncate to the right offset, we
must cache the new
+ // leader epoch and the start offset since it should be larger than
any epoch that a follower
+ // would try to query.
+ leaderLog.maybeAssignEpochStartOffset(partitionState.leaderEpoch,
leaderEpochStartOffset)
+
+ // Initialize lastCaughtUpTime of replicas as well as their
lastFetchTimeMs and
+ // lastFetchLeaderLogEndOffset.
+ remoteReplicas.foreach { replica =>
+ replica.resetReplicaState(
+ currentTimeMs = currentTimeMs,
+ leaderEndOffset = leaderEpochStartOffset,
+ isNewLeader = isNewLeader,
+ isFollowerInSync = partitionState.isr.contains(replica.brokerId)
+ )
+ }
- // Initialize lastCaughtUpTime of replicas as well as their
lastFetchTimeMs and
- // lastFetchLeaderLogEndOffset.
- remoteReplicas.foreach { replica =>
- replica.resetReplicaState(
- currentTimeMs = currentTimeMs,
- leaderEndOffset = leaderEpochStartOffset,
- isNewLeader = isNewLeader,
- isFollowerInSync = partitionState.isr.contains(replica.brokerId)
- )
+ // We update the leader epoch and the leader epoch start offset iff the
+ // leader epoch changed.
+ leaderEpoch = partitionState.leaderEpoch
+ leaderEpochStartOffsetOpt = Some(leaderEpochStartOffset)
+ } else {
+ stateChangeLogger.info(s"Skipped the become-leader state change for
$topicPartition with topic id $topicId " +
+ s"and partition state $partitionState since it is already the leader
with leader epoch $leaderEpoch. " +
+ s"Current high watermark ${leaderLog.highWatermark}, ISR
${isr.mkString("[", ",", "]")}, " +
+ s"adding replicas ${addingReplicas.mkString("[", ",", "]")} and " +
+ s"removing replicas ${removingReplicas.mkString("[", ",", "]")}.")
}
+ partitionEpoch = partitionState.partitionEpoch
leaderReplicaIdOpt = Some(localBrokerId)
- // we may need to increment high watermark since ISR could be down to 1
+ // We may need to increment high watermark since ISR could be down to 1.
(maybeIncrementLeaderHW(leaderLog, currentTimeMs = currentTimeMs),
isNewLeader)
}
- // some delayed operations may be unblocked after HW changed
+ // Some delayed operations may be unblocked after HW changed.
if (leaderHWIncremented)
tryCompleteDelayedRequests()
@@ -631,15 +654,20 @@ class Partition(val topicPartition: TopicPartition,
* Make the local replica the follower by setting the new leader and ISR to
empty
* If the leader replica id does not change and the new epoch is equal or one
* greater (that is, no updates have been missed), return false to indicate
to the
- * replica manager that state is already correct and the become-follower
steps can be skipped
+ * replica manager that state is already correct and the become-follower
steps can
+ * be skipped.
*/
def makeFollower(partitionState: LeaderAndIsrPartitionState,
highWatermarkCheckpoints: OffsetCheckpoints,
topicId: Option[Uuid]): Boolean = {
inWriteLock(leaderIsrUpdateLock) {
- val newLeaderBrokerId = partitionState.leader
- val oldLeaderEpoch = leaderEpoch
- // record the epoch of the controller that made the leadership decision.
This is useful while updating the isr
+ if (partitionState.partitionEpoch < partitionEpoch) {
+ stateChangeLogger.info(s"Skipped the become-follower state change for
$topicPartition with topic id $topicId " +
+ s"and partition state $partitionState since the follower is already
at a newer partition epoch $partitionEpoch.")
+ return false
+ }
+
+ // Record the epoch of the controller that made the leadership decision.
This is useful while updating the isr
// to maintain the decision maker controller's epoch in the zookeeper
path
controllerEpoch = partitionState.controllerEpoch
@@ -650,32 +678,37 @@ class Partition(val topicPartition: TopicPartition,
removingReplicas =
partitionState.removingReplicas.asScala.map(_.toInt),
LeaderRecoveryState.of(partitionState.leaderRecoveryState)
)
+
try {
createLogIfNotExists(partitionState.isNew, isFutureReplica = false,
highWatermarkCheckpoints, topicId)
} catch {
case e: ZooKeeperClientException =>
stateChangeLogger.error(s"A ZooKeeper client exception has occurred.
makeFollower will be skipping the " +
s"state change for the partition $topicPartition with leader
epoch: $leaderEpoch.", e)
-
return false
}
val followerLog = localLogOrException
- val leaderEpochEndOffset = followerLog.logEndOffset
- stateChangeLogger.info(s"Follower $topicPartition starts at leader epoch
${partitionState.leaderEpoch} from " +
- s"offset $leaderEpochEndOffset with high watermark
${followerLog.highWatermark}. " +
- s"Previous leader epoch was $leaderEpoch.")
+ val isNewLeaderEpoch = partitionState.leaderEpoch > leaderEpoch
+ if (isNewLeaderEpoch) {
+ val leaderEpochEndOffset = followerLog.logEndOffset
+ stateChangeLogger.info(s"Follower $topicPartition starts at leader
epoch ${partitionState.leaderEpoch} from " +
+ s"offset $leaderEpochEndOffset with partition epoch
${partitionState.partitionEpoch} and " +
+ s"high watermark ${followerLog.highWatermark}. Previous leader epoch
was $leaderEpoch.")
+ } else {
+ stateChangeLogger.info(s"Skipped the become-follower state change for
$topicPartition with topic id $topicId " +
+ s"and partition state $partitionState since it is already a follower
with leader epoch $leaderEpoch.")
+ }
+
+ leaderReplicaIdOpt = Some(partitionState.leader)
leaderEpoch = partitionState.leaderEpoch
leaderEpochStartOffsetOpt = None
partitionEpoch = partitionState.partitionEpoch
- if (leaderReplicaIdOpt.contains(newLeaderBrokerId) && leaderEpoch ==
oldLeaderEpoch) {
- false
- } else {
- leaderReplicaIdOpt = Some(newLeaderBrokerId)
- true
- }
+ // We must restart the fetchers when the leader epoch changed regardless
of
+ // whether the leader changed as well.
+ isNewLeaderEpoch
}
}
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala
b/core/src/main/scala/kafka/server/ReplicaManager.scala
index fff7fb273d..1c97671c26 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -1599,13 +1599,9 @@ class ReplicaManager(val config: KafkaConfig,
// Update the partition information to be the leader
partitionStates.forKeyValue { (partition, partitionState) =>
try {
- if (partition.makeLeader(partitionState, highWatermarkCheckpoints,
topicIds(partitionState.topicName)))
+ if (partition.makeLeader(partitionState, highWatermarkCheckpoints,
topicIds(partitionState.topicName))) {
partitionsToMakeLeaders += partition
- else
- stateChangeLogger.info(s"Skipped the become-leader state change
after marking its " +
- s"partition as leader with correlation id $correlationId from
controller $controllerId epoch $controllerEpoch for " +
- s"partition ${partition.topicPartition} (last update controller
epoch ${partitionState.controllerEpoch}) " +
- s"since it is already the leader for the partition.")
+ }
} catch {
case e: KafkaStorageException =>
stateChangeLogger.error(s"Skipped the become-leader state change
with " +
@@ -1681,14 +1677,9 @@ class ReplicaManager(val config: KafkaConfig,
try {
if (metadataCache.hasAliveBroker(newLeaderBrokerId)) {
// Only change partition state when the leader is available
- if (partition.makeFollower(partitionState,
highWatermarkCheckpoints, topicIds(partitionState.topicName)))
+ if (partition.makeFollower(partitionState,
highWatermarkCheckpoints, topicIds(partitionState.topicName))) {
partitionsToMakeFollower += partition
- else
- stateChangeLogger.info(s"Skipped the become-follower state
change after marking its partition as " +
- s"follower with correlation id $correlationId from controller
$controllerId epoch $controllerEpoch " +
- s"for partition ${partition.topicPartition} (last update " +
- s"controller epoch ${partitionState.controllerEpoch}) " +
- s"since the new leader $newLeaderBrokerId is the same as the
old leader")
+ }
} else {
// The leader broker should always be present in the metadata
cache.
// If not, we should record the error message and abort the
transition process for this partition
@@ -2180,11 +2171,7 @@ class ReplicaManager(val config: KafkaConfig,
getOrCreatePartition(tp, delta, info.topicId).foreach { case (partition,
isNew) =>
try {
val state = info.partition.toLeaderAndIsrPartitionState(tp, isNew)
- if (!partition.makeLeader(state, offsetCheckpoints,
Some(info.topicId))) {
- stateChangeLogger.info("Skipped the become-leader state change for
" +
- s"$tp with topic id ${info.topicId} because this partition is " +
- "already a local leader.")
- }
+ partition.makeLeader(state, offsetCheckpoints, Some(info.topicId))
changedPartitions.add(partition)
} catch {
case e: KafkaStorageException =>
@@ -2234,9 +2221,6 @@ class ReplicaManager(val config: KafkaConfig,
val state = info.partition.toLeaderAndIsrPartitionState(tp,
isNew)
if (partition.makeFollower(state, offsetCheckpoints,
Some(info.topicId))) {
partitionsToMakeFollower.put(tp, partition)
- } else {
- stateChangeLogger.info("Skipped the become-follower state
change after marking its " +
- s"partition as follower for partition $tp with id
${info.topicId} and partition state $state.")
}
}
}
diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
index 042d2500fc..8645c4c9f7 100644
--- a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
+++ b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
@@ -21,6 +21,7 @@ import kafka.common.UnexpectedAppendOffsetException
import kafka.log.{Defaults => _, _}
import kafka.server._
import kafka.server.checkpoints.OffsetCheckpoints
+import kafka.server.epoch.EpochEntry
import kafka.utils._
import kafka.zk.KafkaZkClient
import org.apache.kafka.common.errors.{ApiException,
InconsistentTopicIdException, NotLeaderOrFollowerException,
OffsetNotAvailableException, OffsetOutOfRangeException}
@@ -39,10 +40,10 @@ import org.mockito.ArgumentMatchers
import org.mockito.ArgumentMatchers.{any, anyString}
import org.mockito.Mockito._
import org.mockito.invocation.InvocationOnMock
+
import java.nio.ByteBuffer
import java.util.Optional
import java.util.concurrent.{CountDownLatch, Semaphore}
-
import kafka.server.epoch.LeaderEpochFileCache
import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.server.common.MetadataVersion.IBP_2_6_IV0
@@ -2105,6 +2106,204 @@ class PartitionTest extends AbstractPartitionTest {
verify(spyConfigRepository, times(2)).topicConfig(topicPartition.topic())
}
+ @Test
+ def testDoNotResetReplicaStateIfLeaderEpochIsNotBumped(): Unit = {
+ val controllerEpoch = 3
+ val leaderId = brokerId
+ val followerId = brokerId + 1
+ val replicas = List(leaderId, followerId)
+ val leaderEpoch = 8
+ val topicId = Uuid.randomUuid()
+
+ val initialLeaderState = new LeaderAndIsrPartitionState()
+ .setControllerEpoch(controllerEpoch)
+ .setLeader(leaderId)
+ .setLeaderEpoch(leaderEpoch)
+ .setIsr(List(leaderId).map(Int.box).asJava)
+ .setPartitionEpoch(1)
+ .setReplicas(replicas.map(Int.box).asJava)
+ .setIsNew(true)
+
+ assertTrue(partition.makeLeader(initialLeaderState, offsetCheckpoints,
Some(topicId)))
+ assertEquals(1, partition.getPartitionEpoch)
+ assertEquals(leaderEpoch, partition.getLeaderEpoch)
+ assertEquals(Set(leaderId), partition.partitionState.isr)
+
+ // Follower's state is initialized with unknown offset because it is not
+ // in the ISR.
+ assertReplicaState(partition, followerId,
+ lastCaughtUpTimeMs = 0L,
+ logStartOffset = UnifiedLog.UnknownOffset,
+ logEndOffset = UnifiedLog.UnknownOffset
+ )
+
+ // Follower fetches and updates its replica state.
+ partition.updateFollowerFetchState(
+ followerId = followerId,
+ followerFetchOffsetMetadata = LogOffsetMetadata(0L),
+ followerStartOffset = 0L,
+ followerFetchTimeMs = time.milliseconds(),
+ leaderEndOffset = partition.localLogOrException.logEndOffset
+ )
+
+ assertReplicaState(partition, followerId,
+ lastCaughtUpTimeMs = time.milliseconds(),
+ logStartOffset = 0L,
+ logEndOffset = 0L
+ )
+
+ // makeLeader is called again with the same leader epoch but with
+ // a newer partition epoch. This can happen in KRaft when a partition
+ // is reassigned. The leader epoch is not bumped when we add replicas.
+ val updatedLeaderState = new LeaderAndIsrPartitionState()
+ .setControllerEpoch(controllerEpoch)
+ .setLeader(leaderId)
+ .setLeaderEpoch(leaderEpoch)
+ .setIsr(List(leaderId).map(Int.box).asJava)
+ .setPartitionEpoch(2)
+ .setReplicas(replicas.map(Int.box).asJava)
+ .setIsNew(false)
+
+ assertFalse(partition.makeLeader(updatedLeaderState, offsetCheckpoints,
Some(topicId)))
+ assertEquals(2, partition.getPartitionEpoch)
+ assertEquals(leaderEpoch, partition.getLeaderEpoch)
+ assertEquals(Set(leaderId), partition.partitionState.isr)
+
+ // Follower's state has not been reset.
+ assertReplicaState(partition, followerId,
+ lastCaughtUpTimeMs = time.milliseconds(),
+ logStartOffset = 0L,
+ logEndOffset = 0L
+ )
+ }
+
+ @Test
+ def testDoNotUpdateEpochStartOffsetIfLeaderEpochIsNotBumped(): Unit = {
+ val controllerEpoch = 3
+ val leaderId = brokerId
+ val followerId = brokerId + 1
+ val replicas = List(leaderId, followerId)
+ val leaderEpoch = 8
+ val topicId = Uuid.randomUuid()
+
+ val initialLeaderState = new LeaderAndIsrPartitionState()
+ .setControllerEpoch(controllerEpoch)
+ .setLeader(leaderId)
+ .setLeaderEpoch(leaderEpoch)
+ .setIsr(List(leaderId).map(Int.box).asJava)
+ .setPartitionEpoch(1)
+ .setReplicas(replicas.map(Int.box).asJava)
+ .setIsNew(true)
+
+ assertTrue(partition.makeLeader(initialLeaderState, offsetCheckpoints,
Some(topicId)))
+ assertEquals(1, partition.getPartitionEpoch)
+ assertEquals(leaderEpoch, partition.getLeaderEpoch)
+ assertEquals(Set(leaderId), partition.partitionState.isr)
+ assertEquals(Some(0L), partition.leaderEpochStartOffsetOpt)
+
+ val leaderLog = partition.localLogOrException
+ assertEquals(Some(EpochEntry(leaderEpoch, 0L)),
leaderLog.leaderEpochCache.flatMap(_.latestEntry))
+
+ // Write to the log to increment the log end offset.
+ leaderLog.appendAsLeader(MemoryRecords.withRecords(0L,
CompressionType.NONE, 0,
+ new SimpleRecord("k1".getBytes, "v1".getBytes),
+ new SimpleRecord("k1".getBytes, "v1".getBytes)
+ ), leaderEpoch = leaderEpoch)
+
+ // makeLeader is called again with the same leader epoch but with
+ // a newer partition epoch.
+ val updatedLeaderState = new LeaderAndIsrPartitionState()
+ .setControllerEpoch(controllerEpoch)
+ .setLeader(leaderId)
+ .setLeaderEpoch(leaderEpoch)
+ .setIsr(List(leaderId).map(Int.box).asJava)
+ .setPartitionEpoch(2)
+ .setReplicas(replicas.map(Int.box).asJava)
+ .setIsNew(false)
+
+ assertFalse(partition.makeLeader(updatedLeaderState, offsetCheckpoints,
Some(topicId)))
+ assertEquals(2, partition.getPartitionEpoch)
+ assertEquals(leaderEpoch, partition.getLeaderEpoch)
+ assertEquals(Set(leaderId), partition.partitionState.isr)
+ assertEquals(Some(0L), partition.leaderEpochStartOffsetOpt)
+ assertEquals(Some(EpochEntry(leaderEpoch, 0L)),
leaderLog.leaderEpochCache.flatMap(_.latestEntry))
+ }
+
+ @Test
+ def testIgnoreLeaderPartitionStateChangeWithOlderPartitionEpoch(): Unit = {
+ val controllerEpoch = 3
+ val leaderId = brokerId
+ val replicas = List(leaderId)
+ val leaderEpoch = 8
+ val topicId = Uuid.randomUuid()
+
+ val initialLeaderState = new LeaderAndIsrPartitionState()
+ .setControllerEpoch(controllerEpoch)
+ .setLeader(leaderId)
+ .setLeaderEpoch(leaderEpoch)
+ .setIsr(List(leaderId).map(Int.box).asJava)
+ .setPartitionEpoch(1)
+ .setReplicas(replicas.map(Int.box).asJava)
+ .setIsNew(true)
+
+ assertTrue(partition.makeLeader(initialLeaderState, offsetCheckpoints,
Some(topicId)))
+ assertEquals(1, partition.getPartitionEpoch)
+ assertEquals(leaderEpoch, partition.getLeaderEpoch)
+
+ // makeLeader is called again with the same leader epoch but with
+ // a older partition epoch.
+ val updatedLeaderState = new LeaderAndIsrPartitionState()
+ .setControllerEpoch(controllerEpoch)
+ .setLeader(leaderId)
+ .setLeaderEpoch(leaderEpoch)
+ .setIsr(List(leaderId).map(Int.box).asJava)
+ .setPartitionEpoch(0)
+ .setReplicas(replicas.map(Int.box).asJava)
+ .setIsNew(false)
+
+ assertFalse(partition.makeLeader(updatedLeaderState, offsetCheckpoints,
Some(topicId)))
+ assertEquals(1, partition.getPartitionEpoch)
+ assertEquals(leaderEpoch, partition.getLeaderEpoch)
+ }
+
+ @Test
+ def testIgnoreFollowerPartitionStateChangeWithOlderPartitionEpoch(): Unit = {
+ val controllerEpoch = 3
+ val leaderId = brokerId
+ val followerId = brokerId + 1
+ val replicas = List(leaderId, followerId)
+ val leaderEpoch = 8
+ val topicId = Uuid.randomUuid()
+
+ val initialFollowerState = new LeaderAndIsrPartitionState()
+ .setControllerEpoch(controllerEpoch)
+ .setLeader(followerId)
+ .setLeaderEpoch(leaderEpoch)
+ .setIsr(List(leaderId, followerId).map(Int.box).asJava)
+ .setPartitionEpoch(1)
+ .setReplicas(replicas.map(Int.box).asJava)
+ .setIsNew(true)
+
+ assertTrue(partition.makeFollower(initialFollowerState, offsetCheckpoints,
Some(topicId)))
+ assertEquals(1, partition.getPartitionEpoch)
+ assertEquals(leaderEpoch, partition.getLeaderEpoch)
+
+ // makeLeader is called again with the same leader epoch but with
+ // a older partition epoch.
+ val updatedFollowerState = new LeaderAndIsrPartitionState()
+ .setControllerEpoch(controllerEpoch)
+ .setLeader(followerId)
+ .setLeaderEpoch(leaderEpoch)
+ .setIsr(List(leaderId, followerId).map(Int.box).asJava)
+ .setPartitionEpoch(1)
+ .setReplicas(replicas.map(Int.box).asJava)
+ .setIsNew(true)
+
+ assertFalse(partition.makeFollower(updatedFollowerState,
offsetCheckpoints, Some(topicId)))
+ assertEquals(1, partition.getPartitionEpoch)
+ assertEquals(leaderEpoch, partition.getLeaderEpoch)
+ }
+
private def makeLeader(
topicId: Option[Uuid],
controllerEpoch: Int,
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index da2d2a9084..ac2fc9926d 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -53,6 +53,7 @@ import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.common.{IsolationLevel, Node, TopicIdPartition,
TopicPartition, Uuid}
import org.apache.kafka.image.{AclsImage, ClientQuotasImage, ClusterImageTest,
ConfigurationsImage, FeaturesImage, MetadataImage, ProducerIdsImage,
TopicsDelta, TopicsImage}
+import org.apache.kafka.metadata.LeaderRecoveryState
import org.apache.kafka.raft.{OffsetAndEpoch => RaftOffsetAndEpoch}
import org.apache.kafka.server.common.MetadataVersion.IBP_2_6_IV0
import org.junit.jupiter.api.Assertions._
@@ -63,7 +64,7 @@ import org.mockito.invocation.InvocationOnMock
import org.mockito.stubbing.Answer
import org.mockito.ArgumentMatchers
import org.mockito.ArgumentMatchers.{any, anyInt}
-import org.mockito.Mockito.{mock, times, verify, when}
+import org.mockito.Mockito.{mock, reset, times, verify, when}
import scala.collection.{Map, Seq, mutable}
import scala.jdk.CollectionConverters._
@@ -3622,6 +3623,221 @@ class ReplicaManagerTest {
TestUtils.assertNoNonDaemonThreads(this.getClass.getName)
}
+ @Test
+ def testFetcherAreNotRestartedIfLeaderEpochIsNotBumpedWithZkPath(): Unit = {
+ val localId = 0
+ val topicPartition = new TopicPartition("foo", 0)
+
+ val mockReplicaFetcherManager = mock(classOf[ReplicaFetcherManager])
+ val replicaManager = setupReplicaManagerWithMockedPurgatories(
+ timer = new MockTimer(time),
+ brokerId = localId,
+ aliveBrokerIds = Seq(localId, localId + 1, localId + 2),
+ mockReplicaFetcherManager = Some(mockReplicaFetcherManager)
+ )
+
+ try {
+ when(mockReplicaFetcherManager.removeFetcherForPartitions(
+ Set(topicPartition))
+ ).thenReturn(Map.empty[TopicPartition, PartitionFetchState])
+
+ // Make the local replica the follower.
+ var request = makeLeaderAndIsrRequest(
+ topicId = FOO_UUID,
+ topicPartition = topicPartition,
+ replicas = Seq(localId, localId + 1),
+ leaderAndIsr = LeaderAndIsr(
+ leader = localId + 1,
+ leaderEpoch = 0,
+ isr = List(localId, localId + 1),
+ leaderRecoveryState = LeaderRecoveryState.RECOVERED,
+ partitionEpoch = 0
+ )
+ )
+
+ replicaManager.becomeLeaderOrFollower(0, request, (_, _) => ())
+
+ // Check the state of that partition.
+ val HostedPartition.Online(followerPartition) =
replicaManager.getPartition(topicPartition)
+ assertFalse(followerPartition.isLeader)
+ assertEquals(0, followerPartition.getLeaderEpoch)
+ assertEquals(0, followerPartition.getPartitionEpoch)
+
+ // Verify that the partition was removed and added back.
+
verify(mockReplicaFetcherManager).removeFetcherForPartitions(Set(topicPartition))
+
verify(mockReplicaFetcherManager).addFetcherForPartitions(Map(topicPartition ->
InitialFetchState(
+ topicId = Some(FOO_UUID),
+ leader = BrokerEndPoint(localId + 1, s"host${localId + 1}", localId +
1),
+ currentLeaderEpoch = 0,
+ initOffset = 0
+ )))
+
+ reset(mockReplicaFetcherManager)
+
+ // Apply changes that bumps the partition epoch.
+ request = makeLeaderAndIsrRequest(
+ topicId = FOO_UUID,
+ topicPartition = topicPartition,
+ replicas = Seq(localId, localId + 1, localId + 2),
+ leaderAndIsr = LeaderAndIsr(
+ leader = localId + 1,
+ leaderEpoch = 0,
+ isr = List(localId, localId + 1),
+ leaderRecoveryState = LeaderRecoveryState.RECOVERED,
+ partitionEpoch = 1
+ )
+ )
+
+ replicaManager.becomeLeaderOrFollower(0, request, (_, _) => ())
+
+ assertFalse(followerPartition.isLeader)
+ assertEquals(0, followerPartition.getLeaderEpoch)
+ // Partition updates is fenced based on the leader epoch on the ZK path.
+ assertEquals(0, followerPartition.getPartitionEpoch)
+
+ // As the update is fenced based on the leader epoch,
removeFetcherForPartitions and
+ // addFetcherForPartitions are not called at all.
+ reset(mockReplicaFetcherManager)
+
+ // Apply changes that bumps the leader epoch.
+ request = makeLeaderAndIsrRequest(
+ topicId = FOO_UUID,
+ topicPartition = topicPartition,
+ replicas = Seq(localId, localId + 1, localId + 2),
+ leaderAndIsr = LeaderAndIsr(
+ leader = localId + 2,
+ leaderEpoch = 1,
+ isr = List(localId, localId + 1, localId + 2),
+ leaderRecoveryState = LeaderRecoveryState.RECOVERED,
+ partitionEpoch = 2
+ )
+ )
+
+ replicaManager.becomeLeaderOrFollower(0, request, (_, _) => ())
+
+ assertFalse(followerPartition.isLeader)
+ assertEquals(1, followerPartition.getLeaderEpoch)
+ assertEquals(2, followerPartition.getPartitionEpoch)
+
+ // Verify that the partition was removed and added back.
+
verify(mockReplicaFetcherManager).removeFetcherForPartitions(Set(topicPartition))
+
verify(mockReplicaFetcherManager).addFetcherForPartitions(Map(topicPartition ->
InitialFetchState(
+ topicId = Some(FOO_UUID),
+ leader = BrokerEndPoint(localId + 2, s"host${localId + 2}", localId +
2),
+ currentLeaderEpoch = 1,
+ initOffset = 0
+ )))
+ } finally {
+ replicaManager.shutdown()
+ }
+
+ TestUtils.assertNoNonDaemonThreads(this.getClass.getName)
+ }
+
+ @Test
+ def testFetcherAreNotRestartedIfLeaderEpochIsNotBumpedWithKRaftPath(): Unit
= {
+ val localId = 0
+ val topicPartition = new TopicPartition("foo", 0)
+
+ val mockReplicaFetcherManager = mock(classOf[ReplicaFetcherManager])
+ val replicaManager = setupReplicaManagerWithMockedPurgatories(
+ timer = new MockTimer(time),
+ brokerId = localId,
+ mockReplicaFetcherManager = Some(mockReplicaFetcherManager)
+ )
+
+ try {
+ when(mockReplicaFetcherManager.removeFetcherForPartitions(
+ Set(topicPartition))
+ ).thenReturn(Map.empty[TopicPartition, PartitionFetchState])
+
+ // Make the local replica the follower.
+ var followerTopicsDelta = new TopicsDelta(TopicsImage.EMPTY)
+ followerTopicsDelta.replay(new
TopicRecord().setName("foo").setTopicId(FOO_UUID))
+ followerTopicsDelta.replay(new PartitionRecord()
+ .setPartitionId(0)
+ .setTopicId(FOO_UUID)
+ .setReplicas(util.Arrays.asList(localId, localId + 1))
+ .setIsr(util.Arrays.asList(localId, localId + 1))
+ .setRemovingReplicas(Collections.emptyList())
+ .setAddingReplicas(Collections.emptyList())
+ .setLeader(localId + 1)
+ .setLeaderEpoch(0)
+ .setPartitionEpoch(0)
+ )
+ var followerMetadataImage = imageFromTopics(followerTopicsDelta.apply())
+ replicaManager.applyDelta(followerTopicsDelta, followerMetadataImage)
+
+ // Check the state of that partition.
+ val HostedPartition.Online(followerPartition) =
replicaManager.getPartition(topicPartition)
+ assertFalse(followerPartition.isLeader)
+ assertEquals(0, followerPartition.getLeaderEpoch)
+ assertEquals(0, followerPartition.getPartitionEpoch)
+
+ // Verify that the partition was removed and added back.
+
verify(mockReplicaFetcherManager).removeFetcherForPartitions(Set(topicPartition))
+
verify(mockReplicaFetcherManager).addFetcherForPartitions(Map(topicPartition ->
InitialFetchState(
+ topicId = Some(FOO_UUID),
+ leader = BrokerEndPoint(localId + 1, "localhost", 9093),
+ currentLeaderEpoch = 0,
+ initOffset = 0
+ )))
+
+ reset(mockReplicaFetcherManager)
+
+ // Apply changes that bumps the partition epoch.
+ followerTopicsDelta = new TopicsDelta(followerMetadataImage.topics())
+ followerTopicsDelta.replay(new PartitionChangeRecord()
+ .setPartitionId(0)
+ .setTopicId(FOO_UUID)
+ .setReplicas(util.Arrays.asList(localId, localId + 1, localId + 2))
+ .setIsr(util.Arrays.asList(localId, localId + 1))
+ )
+ followerMetadataImage = imageFromTopics(followerTopicsDelta.apply())
+ replicaManager.applyDelta(followerTopicsDelta, followerMetadataImage)
+
+ assertFalse(followerPartition.isLeader)
+ assertEquals(0, followerPartition.getLeaderEpoch)
+ assertEquals(1, followerPartition.getPartitionEpoch)
+
+ // Verify that partition's fetcher was not impacted.
+ verify(mockReplicaFetcherManager).removeFetcherForPartitions(Set.empty)
+ verify(mockReplicaFetcherManager).addFetcherForPartitions(Map.empty)
+
+ reset(mockReplicaFetcherManager)
+
+ // Apply changes that bumps the leader epoch.
+ followerTopicsDelta = new TopicsDelta(followerMetadataImage.topics())
+ followerTopicsDelta.replay(new PartitionChangeRecord()
+ .setPartitionId(0)
+ .setTopicId(FOO_UUID)
+ .setReplicas(util.Arrays.asList(localId, localId + 1, localId + 2))
+ .setIsr(util.Arrays.asList(localId, localId + 1, localId + 2))
+ .setLeader(localId + 2)
+ )
+ println(followerTopicsDelta.changedTopic(FOO_UUID).partitionChanges())
+ followerMetadataImage = imageFromTopics(followerTopicsDelta.apply())
+ replicaManager.applyDelta(followerTopicsDelta, followerMetadataImage)
+
+ assertFalse(followerPartition.isLeader)
+ assertEquals(1, followerPartition.getLeaderEpoch)
+ assertEquals(2, followerPartition.getPartitionEpoch)
+
+ // Verify that the partition was removed and added back.
+
verify(mockReplicaFetcherManager).removeFetcherForPartitions(Set(topicPartition))
+
verify(mockReplicaFetcherManager).addFetcherForPartitions(Map(topicPartition ->
InitialFetchState(
+ topicId = Some(FOO_UUID),
+ leader = BrokerEndPoint(localId + 2, "localhost", 9093),
+ currentLeaderEpoch = 1,
+ initOffset = 0
+ )))
+ } finally {
+ replicaManager.shutdown()
+ }
+
+ TestUtils.assertNoNonDaemonThreads(this.getClass.getName)
+ }
+
private def topicsCreateDelta(startId: Int, isStartIdLeader: Boolean):
TopicsDelta = {
val leader = if (isStartIdLeader) startId else startId + 1
val delta = new TopicsDelta(TopicsImage.EMPTY)