This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new fd702906331 KAFKA-18486 Remove ReplicaManager#becomeLeaderOrFollower
from testFencedErrorCausedByBecomeLeader and other similar methods (#19966)
fd702906331 is described below
commit fd70290633191b6f53a9d4ddb24e3a8b619fcd3f
Author: Nick Guo <[email protected]>
AuthorDate: Mon Jun 16 21:43:41 2025 +0800
KAFKA-18486 Remove ReplicaManager#becomeLeaderOrFollower from
testFencedErrorCausedByBecomeLeader and other similar methods (#19966)
The included tests are as follows:
- testFencedErrorCausedByBecomeLeader
- testFetchBeyondHighWatermark
- testFetchFollowerNotAllowedForOlderClients
- testFetchFromFollowerShouldNotRunPreferLeaderSelect
- testFetchFromLeaderAlwaysAllowed
- testFetchMessagesWhenNotFollowerForOnePartition
- testFetchRequestRateMetrics
- testFetchShouldReturnImmediatelyWhenPreferredReadReplicaIsDefined
- testFollowerFetchWithDefaultSelectorNoForcedHwPropagation
- testFollowerStateNotUpdatedIfLogReadFails
I removed `testFetchMessagesWithInconsistentTopicId ` as it's no longer
needed, the "topicId" is now mandatory and cannot be null in our new
implementation.
Reviewers: Jhen-Yung Hsu <[email protected]>, Lan Ding
<[email protected]>, Chia-Ping Tsai <[email protected]>
---
.../unit/kafka/server/ReplicaManagerTest.scala | 326 ++++-----------------
1 file changed, 65 insertions(+), 261 deletions(-)
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index d5d500c87c7..e3b37a8df2e 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -512,37 +512,21 @@ class ReplicaManagerTest {
}
}
- @Test
- def testFencedErrorCausedByBecomeLeader(): Unit = {
- testFencedErrorCausedByBecomeLeader(0)
- testFencedErrorCausedByBecomeLeader(1)
- testFencedErrorCausedByBecomeLeader(10)
- }
-
- private[this] def testFencedErrorCausedByBecomeLeader(loopEpochChange: Int):
Unit = {
+ @ParameterizedTest
+ @ValueSource(ints = Array(0, 1, 10))
+ def testFencedErrorCausedByBecomeLeader(loopEpochChange: Int): Unit = {
+ val localId = 0
val replicaManager = setupReplicaManagerWithMockedPurgatories(new
MockTimer(time))
try {
- val brokerList = Seq[Integer](0, 1).asJava
val topicPartition = new TopicPartition(topic, 0)
replicaManager.createPartition(topicPartition)
.createLogIfNotExists(isNew = false, isFutureReplica = false,
new
LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints.asJava), None)
- def leaderAndIsrRequest(epoch: Int): LeaderAndIsrRequest = new
LeaderAndIsrRequest.Builder(0, 0, brokerEpoch,
- Seq(new LeaderAndIsrRequest.PartitionState()
- .setTopicName(topic)
- .setPartitionIndex(0)
- .setControllerEpoch(0)
- .setLeader(0)
- .setLeaderEpoch(epoch)
- .setIsr(brokerList)
- .setPartitionEpoch(0)
- .setReplicas(brokerList)
- .setIsNew(true)).asJava,
- topicIds.asJava,
- Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
+ val leaderDelta = topicsCreateDelta(localId, isStartIdLeader = true,
topicName = topic, topicId = topicIds(topic))
+ val leaderImage = imageFromTopics(leaderDelta.apply())
+ replicaManager.applyDelta(leaderDelta, leaderImage)
- replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest(0), (_, _)
=> ())
val partition = replicaManager.getPartitionOrException(new
TopicPartition(topic, 0))
assertEquals(1, replicaManager.logManager.liveLogDirs.filterNot(_ ==
partition.log.get.dir.getParentFile).size)
@@ -554,7 +538,12 @@ class ReplicaManagerTest {
// make sure the future log is created
replicaManager.futureLocalLogOrException(topicPartition)
assertEquals(1,
replicaManager.replicaAlterLogDirsManager.fetcherThreadMap.size)
- (1 to loopEpochChange).foreach(epoch =>
replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest(epoch), (_, _) =>
()))
+ (1 to loopEpochChange).foreach(
+ epoch => {
+ val leaderDelta = topicsCreateDelta(localId, isStartIdLeader = true,
topicName = topic, topicId = topicIds(topic), leaderEpoch = epoch)
+ replicaManager.applyDelta(leaderDelta,
imageFromTopics(leaderDelta.apply()))
+ }
+ )
// wait for the ReplicaAlterLogDirsThread to complete
TestUtils.waitUntilTrue(() => {
replicaManager.replicaAlterLogDirsManager.shutdownIdleFetcherThreads()
@@ -974,25 +963,16 @@ class ReplicaManagerTest {
try {
val brokerList = Seq[Integer](0, 1, 2).asJava
- val partition = rm.createPartition(new TopicPartition(topic, 0))
+ val tp = new TopicPartition(topic, 0)
+ val partition = rm.createPartition(tp)
partition.createLogIfNotExists(isNew = false, isFutureReplica = false,
new LazyOffsetCheckpoints(rm.highWatermarkCheckpoints.asJava), None)
// Make this replica the leader.
- val leaderAndIsrRequest1 = new LeaderAndIsrRequest.Builder(0, 0,
brokerEpoch,
- Seq(new LeaderAndIsrRequest.PartitionState()
- .setTopicName(topic)
- .setPartitionIndex(0)
- .setControllerEpoch(0)
- .setLeader(0)
- .setLeaderEpoch(0)
- .setIsr(brokerList)
- .setPartitionEpoch(0)
- .setReplicas(brokerList)
- .setIsNew(false)).asJava,
- topicIds.asJava,
- Set(new Node(0, "host1", 0), new Node(1, "host2", 1), new Node(2,
"host2", 2)).asJava).build()
- rm.becomeLeaderOrFollower(0, leaderAndIsrRequest1, (_, _) => ())
+ val leaderDelta = createLeaderDelta(topicId, tp, leaderId = 0, replicas
= brokerList, isr = brokerList)
+ val leaderMetadataImage = imageFromTopics(leaderDelta.apply())
+ rm.applyDelta(leaderDelta, leaderMetadataImage)
+
rm.getPartitionOrException(new TopicPartition(topic, 0))
.localLogOrException
@@ -1030,6 +1010,7 @@ class ReplicaManagerTest {
@Test
def testFollowerStateNotUpdatedIfLogReadFails(): Unit = {
+ val localId = 0
val maxFetchBytes = 1024 * 1024
val aliveBrokersIds = Seq(0, 1)
val leaderEpoch = 5
@@ -1038,25 +1019,11 @@ class ReplicaManagerTest {
try {
val tp = new TopicPartition(topic, 0)
val tidp = new TopicIdPartition(topicId, tp)
- val replicas = aliveBrokersIds.toList.map(Int.box).asJava
// Broker 0 becomes leader of the partition
- val leaderAndIsrPartitionState = new LeaderAndIsrRequest.PartitionState()
- .setTopicName(topic)
- .setPartitionIndex(0)
- .setControllerEpoch(0)
- .setLeader(0)
- .setLeaderEpoch(leaderEpoch)
- .setIsr(replicas)
- .setPartitionEpoch(0)
- .setReplicas(replicas)
- .setIsNew(true)
- val leaderAndIsrRequest = new LeaderAndIsrRequest.Builder(0, 0,
brokerEpoch,
- Seq(leaderAndIsrPartitionState).asJava,
- Collections.singletonMap(topic, topicId),
- Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
- val leaderAndIsrResponse = replicaManager.becomeLeaderOrFollower(0,
leaderAndIsrRequest, (_, _) => ())
- assertEquals(Errors.NONE, leaderAndIsrResponse.error)
+ val leaderDelta = topicsCreateDelta(localId, isStartIdLeader = true,
topicName = topic, topicId = topicIds(topic), leaderEpoch = leaderEpoch)
+ val leaderImage = imageFromTopics(leaderDelta.apply())
+ replicaManager.applyDelta(leaderDelta, leaderImage)
// Follower replica state is initialized, but initial state is not known
assertTrue(replicaManager.onlinePartition(tp).isDefined)
@@ -1129,6 +1096,7 @@ class ReplicaManagerTest {
@Test
def testFetchMessagesWithInconsistentTopicId(): Unit = {
+ val localId = 0
val maxFetchBytes = 1024 * 1024
val aliveBrokersIds = Seq(0, 1)
val leaderEpoch = 5
@@ -1137,25 +1105,11 @@ class ReplicaManagerTest {
try {
val tp = new TopicPartition(topic, 0)
val tidp = new TopicIdPartition(topicId, tp)
- val replicas = aliveBrokersIds.toList.map(Int.box).asJava
// Broker 0 becomes leader of the partition
- val leaderAndIsrPartitionState = new LeaderAndIsrRequest.PartitionState()
- .setTopicName(topic)
- .setPartitionIndex(0)
- .setControllerEpoch(0)
- .setLeader(0)
- .setLeaderEpoch(leaderEpoch)
- .setIsr(replicas)
- .setPartitionEpoch(0)
- .setReplicas(replicas)
- .setIsNew(true)
- val leaderAndIsrRequest = new LeaderAndIsrRequest.Builder(0, 0,
brokerEpoch,
- Seq(leaderAndIsrPartitionState).asJava,
- Collections.singletonMap(topic, topicId),
- Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
- val leaderAndIsrResponse = replicaManager.becomeLeaderOrFollower(0,
leaderAndIsrRequest, (_, _) => ())
- assertEquals(Errors.NONE, leaderAndIsrResponse.error)
+ val leaderDelta = topicsCreateDelta(localId, isStartIdLeader = true,
topicName = topic, topicId = topicIds(topic), leaderEpoch = leaderEpoch)
+ val leaderImage = imageFromTopics(leaderDelta.apply())
+ replicaManager.applyDelta(leaderDelta, leaderImage)
assertEquals(Some(topicId),
replicaManager.getPartitionOrException(tp).topicId)
@@ -1195,54 +1149,6 @@ class ReplicaManagerTest {
val fetch2 = successfulFetch.headOption.filter(_._1 ==
zeroTidp).map(_._2)
assertTrue(fetch2.isDefined)
assertEquals(Errors.NONE, fetch2.get.error)
-
- // Next create a topic without a topic ID written in the log.
- val tp2 = new TopicPartition("noIdTopic", 0)
- val tidp2 = new TopicIdPartition(Uuid.randomUuid(), tp2)
-
- // Broker 0 becomes leader of the partition
- val leaderAndIsrPartitionState2 = new
LeaderAndIsrRequest.PartitionState()
- .setTopicName("noIdTopic")
- .setPartitionIndex(0)
- .setControllerEpoch(0)
- .setLeader(0)
- .setLeaderEpoch(leaderEpoch)
- .setIsr(replicas)
- .setPartitionEpoch(0)
- .setReplicas(replicas)
- .setIsNew(true)
- val leaderAndIsrRequest2 = new LeaderAndIsrRequest.Builder(0, 0,
brokerEpoch,
- Seq(leaderAndIsrPartitionState2).asJava,
- Collections.emptyMap(),
- Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
- val leaderAndIsrResponse2 = replicaManager.becomeLeaderOrFollower(0,
leaderAndIsrRequest2, (_, _) => ())
- assertEquals(Errors.NONE, leaderAndIsrResponse2.error)
-
- assertEquals(None, replicaManager.getPartitionOrException(tp2).topicId)
-
- // Fetch messages simulating the request containing a topic ID. We
should not have an error.
- fetchPartitions(
- replicaManager,
- replicaId = 1,
- fetchInfos = Seq(tidp2 -> validFetchPartitionData),
- responseCallback = callback
- )
- val fetch3 = successfulFetch.headOption.filter(_._1 == tidp2).map(_._2)
- assertTrue(fetch3.isDefined)
- assertEquals(Errors.NONE, fetch3.get.error)
-
- // Fetch messages simulating the request not containing a topic ID. We
should not have an error.
- val zeroTidp2 = new TopicIdPartition(Uuid.ZERO_UUID,
tidp2.topicPartition)
- fetchPartitions(
- replicaManager,
- replicaId = 1,
- fetchInfos = Seq(zeroTidp2 -> validFetchPartitionData),
- responseCallback = callback
- )
- val fetch4 = successfulFetch.headOption.filter(_._1 ==
zeroTidp2).map(_._2)
- assertTrue(fetch4.isDefined)
- assertEquals(Errors.NONE, fetch4.get.error)
-
} finally {
replicaManager.shutdown(checkpointHW = false)
}
@@ -1257,6 +1163,7 @@ class ReplicaManagerTest {
val replicaManager = setupReplicaManagerWithMockedPurgatories(new
MockTimer(time), aliveBrokerIds = Seq(0, 1, 2))
try {
+ val leaderEpoch = 0
// Create 2 partitions, assign replica 0 as the leader for both a
different follower (1 and 2) for each
val tp0 = new TopicPartition(topic, 0)
val tp1 = new TopicPartition(topic, 1)
@@ -1267,34 +1174,14 @@ class ReplicaManagerTest {
replicaManager.createPartition(tp1).createLogIfNotExists(isNew = false,
isFutureReplica = false, offsetCheckpoints, None)
val partition0Replicas = Seq[Integer](0, 1).asJava
val partition1Replicas = Seq[Integer](0, 2).asJava
- val topicIds = Map(tp0.topic -> topicId, tp1.topic -> topicId).asJava
- val leaderEpoch = 0
- val leaderAndIsrRequest = new LeaderAndIsrRequest.Builder(0, 0,
brokerEpoch,
- Seq(
- new LeaderAndIsrRequest.PartitionState()
- .setTopicName(tp0.topic)
- .setPartitionIndex(tp0.partition)
- .setControllerEpoch(0)
- .setLeader(leaderEpoch)
- .setLeaderEpoch(0)
- .setIsr(partition0Replicas)
- .setPartitionEpoch(0)
- .setReplicas(partition0Replicas)
- .setIsNew(true),
- new LeaderAndIsrRequest.PartitionState()
- .setTopicName(tp1.topic)
- .setPartitionIndex(tp1.partition)
- .setControllerEpoch(0)
- .setLeader(0)
- .setLeaderEpoch(leaderEpoch)
- .setIsr(partition1Replicas)
- .setPartitionEpoch(0)
- .setReplicas(partition1Replicas)
- .setIsNew(true)
- ).asJava,
- topicIds,
- Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
- replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest, (_, _) =>
())
+
+ val leaderDelta0 = createLeaderDelta(topicIds(topic), tp0, 0,
partition0Replicas, partition0Replicas)
+ val leaderMetadataImage0 = imageFromTopics(leaderDelta0.apply())
+ replicaManager.applyDelta(leaderDelta0, leaderMetadataImage0)
+
+ val leaderDelta1 = createLeaderDelta(topicIds(topic), tp1, 0,
partition1Replicas, partition1Replicas)
+ val leaderMetadataImage1 = imageFromTopics(leaderDelta1.apply())
+ replicaManager.applyDelta(leaderDelta1, leaderMetadataImage1)
// Append a couple of messages.
for (i <- 1 to 2) {
@@ -1642,27 +1529,13 @@ class ReplicaManagerTest {
val replicaManager = setupReplicaManagerWithMockedPurgatories(new
MockTimer(time),
propsModifier = props =>
props.put(ReplicationConfigs.REPLICA_SELECTOR_CLASS_CONFIG,
classOf[MockReplicaSelector].getName))
try {
- val leaderBrokerId = 0
- val followerBrokerId = 1
- val brokerList = Seq[Integer](leaderBrokerId, followerBrokerId).asJava
val tp0 = new TopicPartition(topic, 0)
val tidp0 = new TopicIdPartition(topicId, tp0)
// Make this replica the follower
- val leaderAndIsrRequest = new LeaderAndIsrRequest.Builder(0, 0,
brokerEpoch,
- Seq(new LeaderAndIsrRequest.PartitionState()
- .setTopicName(topic)
- .setPartitionIndex(0)
- .setControllerEpoch(0)
- .setLeader(1)
- .setLeaderEpoch(1)
- .setIsr(brokerList)
- .setPartitionEpoch(0)
- .setReplicas(brokerList)
- .setIsNew(false)).asJava,
- Collections.singletonMap(topic, topicId),
- Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
- replicaManager.becomeLeaderOrFollower(1, leaderAndIsrRequest, (_, _) =>
())
+ val followerDelta = createFollowerDelta(topicId, tp0, 0, 1, 1)
+ val followerMetadataImage = imageFromTopics(followerDelta.apply())
+ replicaManager.applyDelta(followerDelta, followerMetadataImage)
val metadata = new DefaultClientMetadata("rack-a", "client-id",
InetAddress.getLocalHost, KafkaPrincipal.ANONYMOUS, "default")
@@ -1687,13 +1560,13 @@ class ReplicaManagerTest {
@Test
def testFetchShouldReturnImmediatelyWhenPreferredReadReplicaIsDefined():
Unit = {
+ val localId = 0
val replicaManager = setupReplicaManagerWithMockedPurgatories(new
MockTimer(time),
propsModifier = props =>
props.put(ReplicationConfigs.REPLICA_SELECTOR_CLASS_CONFIG,
"org.apache.kafka.common.replica.RackAwareReplicaSelector"))
try {
val leaderBrokerId = 0
val followerBrokerId = 1
- val brokerList = Seq[Integer](leaderBrokerId, followerBrokerId).asJava
val tp0 = new TopicPartition(topic, 0)
val tidp0 = new TopicIdPartition(topicId, tp0)
@@ -1707,20 +1580,9 @@ class ReplicaManagerTest {
// Make this replica the leader
val leaderEpoch = 1
- val leaderAndIsrRequest = new LeaderAndIsrRequest.Builder(0, 0,
brokerEpoch,
- Seq(new LeaderAndIsrRequest.PartitionState()
- .setTopicName(topic)
- .setPartitionIndex(0)
- .setControllerEpoch(0)
- .setLeader(0)
- .setLeaderEpoch(leaderEpoch)
- .setIsr(brokerList)
- .setPartitionEpoch(0)
- .setReplicas(brokerList)
- .setIsNew(false)).asJava,
- Collections.singletonMap(topic, topicId),
- Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
- replicaManager.becomeLeaderOrFollower(1, leaderAndIsrRequest, (_, _) =>
())
+ val leaderDelta = topicsCreateDelta(localId, isStartIdLeader = true,
topicName = topic, topicId = topicIds(topic), leaderEpoch = leaderEpoch)
+ val leaderImage = imageFromTopics(leaderDelta.apply())
+ replicaManager.applyDelta(leaderDelta, leaderImage)
// The leader must record the follower's fetch offset to make it
eligible for follower fetch selection
val followerFetchData = new PartitionData(topicId, 0L, 0L, Int.MaxValue,
Optional.of(Int.box(leaderEpoch)), Optional.empty[Integer])
@@ -1767,27 +1629,15 @@ class ReplicaManagerTest {
topicPartition, leaderEpoch + leaderEpochIncrement, followerBrokerId,
leaderBrokerId, countDownLatch, expectTruncation = true, topicId =
Optional.of(topicId))
try {
-
- val brokerList = Seq[Integer](0, 1).asJava
-
val tp0 = new TopicPartition(topic, 0)
val tidp0 = new TopicIdPartition(topicId, tp0)
+ val offsetCheckpoints = new
LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints.asJava)
+ replicaManager.createPartition(tp0).createLogIfNotExists(isNew = false,
isFutureReplica = false, offsetCheckpoints, None)
// Make this replica the follower
- val leaderAndIsrRequest2 = new LeaderAndIsrRequest.Builder(0, 0,
brokerEpoch,
- Seq(new LeaderAndIsrRequest.PartitionState()
- .setTopicName(topic)
- .setPartitionIndex(0)
- .setControllerEpoch(0)
- .setLeader(0)
- .setLeaderEpoch(1)
- .setIsr(brokerList)
- .setPartitionEpoch(0)
- .setReplicas(brokerList)
- .setIsNew(false)).asJava,
- Collections.singletonMap(topic, topicId),
- Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
- replicaManager.becomeLeaderOrFollower(1, leaderAndIsrRequest2, (_, _) =>
())
+ val followerDelta = createFollowerDelta(topicId, tp0, 1, 0, 1)
+ val followerImage = imageFromTopics(followerDelta.apply())
+ replicaManager.applyDelta(followerDelta, followerImage)
val simpleRecords = Seq(new SimpleRecord("a".getBytes), new
SimpleRecord("b".getBytes))
val appendResult = appendRecords(replicaManager, tp0,
@@ -1873,21 +1723,10 @@ class ReplicaManagerTest {
val tidp0 = new TopicIdPartition(topicId, tp0)
val offsetCheckpoints = new
LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints.asJava)
replicaManager.createPartition(tp0).createLogIfNotExists(isNew = false,
isFutureReplica = false, offsetCheckpoints, None)
- val partition0Replicas = Seq[Integer](0, 1).asJava
- val becomeFollowerRequest = new LeaderAndIsrRequest.Builder(0, 0,
brokerEpoch,
- Seq(new LeaderAndIsrRequest.PartitionState()
- .setTopicName(tp0.topic)
- .setPartitionIndex(tp0.partition)
- .setControllerEpoch(0)
- .setLeader(1)
- .setLeaderEpoch(0)
- .setIsr(partition0Replicas)
- .setPartitionEpoch(0)
- .setReplicas(partition0Replicas)
- .setIsNew(true)).asJava,
- topicIds.asJava,
- Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
- replicaManager.becomeLeaderOrFollower(0, becomeFollowerRequest, (_, _)
=> ())
+
+ val followerDelta = createFollowerDelta(topicId, tp0, 0, 1)
+ val followerImage = imageFromTopics(followerDelta.apply())
+ replicaManager.applyDelta(followerDelta, followerImage)
// Fetch from follower, with non-empty ClientMetadata (FetchRequest v11+)
val clientMetadata = new DefaultClientMetadata("", "", null,
KafkaPrincipal.ANONYMOUS, "")
@@ -1909,6 +1748,7 @@ class ReplicaManagerTest {
@Test
def testFetchRequestRateMetrics(): Unit = {
+ val localId = 0
val mockTimer = new MockTimer(time)
val replicaManager = setupReplicaManagerWithMockedPurgatories(mockTimer,
aliveBrokerIds = Seq(0, 1))
@@ -1917,22 +1757,10 @@ class ReplicaManagerTest {
val tidp0 = new TopicIdPartition(topicId, tp0)
val offsetCheckpoints = new
LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints.asJava)
replicaManager.createPartition(tp0).createLogIfNotExists(isNew = false,
isFutureReplica = false, offsetCheckpoints, None)
- val partition0Replicas = Seq[Integer](0, 1).asJava
- val becomeLeaderRequest = new LeaderAndIsrRequest.Builder(0, 0,
brokerEpoch,
- Seq(new LeaderAndIsrRequest.PartitionState()
- .setTopicName(tp0.topic)
- .setPartitionIndex(tp0.partition)
- .setControllerEpoch(0)
- .setLeader(0)
- .setLeaderEpoch(1)
- .setIsr(partition0Replicas)
- .setPartitionEpoch(0)
- .setReplicas(partition0Replicas)
- .setIsNew(true)).asJava,
- topicIds.asJava,
- Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
- replicaManager.becomeLeaderOrFollower(1, becomeLeaderRequest, (_, _) =>
())
+ val leaderDelta = topicsCreateDelta(localId, isStartIdLeader = true,
topicName = topic, topicId = topicIds(topic), leaderEpoch = 1)
+ val leaderImage = imageFromTopics(leaderDelta.apply())
+ replicaManager.applyDelta(leaderDelta, leaderImage)
def assertMetricCount(expected: Int): Unit = {
assertEquals(expected,
replicaManager.brokerTopicStats.allTopicsStats.totalFetchRequestRate.count)
@@ -2026,6 +1854,7 @@ class ReplicaManagerTest {
@Test
def testFetchFromLeaderAlwaysAllowed(): Unit = {
+ val localId = 0
val replicaManager = setupReplicaManagerWithMockedPurgatories(new
MockTimer(time), aliveBrokerIds = Seq(0, 1))
try {
@@ -2033,22 +1862,10 @@ class ReplicaManagerTest {
val tidp0 = new TopicIdPartition(topicId, tp0)
val offsetCheckpoints = new
LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints.asJava)
replicaManager.createPartition(tp0).createLogIfNotExists(isNew = false,
isFutureReplica = false, offsetCheckpoints, None)
- val partition0Replicas = Seq[Integer](0, 1).asJava
- val becomeLeaderRequest = new LeaderAndIsrRequest.Builder(0, 0,
brokerEpoch,
- Seq(new LeaderAndIsrRequest.PartitionState()
- .setTopicName(tp0.topic)
- .setPartitionIndex(tp0.partition)
- .setControllerEpoch(0)
- .setLeader(0)
- .setLeaderEpoch(1)
- .setIsr(partition0Replicas)
- .setPartitionEpoch(0)
- .setReplicas(partition0Replicas)
- .setIsNew(true)).asJava,
- topicIds.asJava,
- Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
- replicaManager.becomeLeaderOrFollower(1, becomeLeaderRequest, (_, _) =>
())
+ val leaderDelta = topicsCreateDelta(localId, isStartIdLeader = true,
topicName = topic, topicId = topicIds(topic), leaderEpoch = 1)
+ val leaderImage = imageFromTopics(leaderDelta.apply())
+ replicaManager.applyDelta(leaderDelta, leaderImage)
val clientMetadata = new DefaultClientMetadata("", "", null,
KafkaPrincipal.ANONYMOUS, "")
var partitionData = new FetchRequest.PartitionData(Uuid.ZERO_UUID, 0L,
0L, 100,
@@ -2828,6 +2645,7 @@ class ReplicaManagerTest {
allLogs.put(topicPartitionObj, mockLog)
when(mockLogMgr.allLogs).thenReturn(allLogs.values.asScala)
when(mockLogMgr.isLogDirOnline(anyString)).thenReturn(true)
+ when(mockLogMgr.directoryId(anyString)).thenReturn(None)
val aliveBrokerIds = Seq[Integer](followerBrokerId, leaderBrokerId)
val aliveBrokers = aliveBrokerIds.map(brokerId => new Node(brokerId,
s"host$brokerId", brokerId))
@@ -3414,8 +3232,6 @@ class ReplicaManagerTest {
.setTopicId(topicIds.get(topic))
.setIsr(partition1Replicas)
.setReplicas(partition1Replicas)
- .setRemovingReplicas(util.List.of())
- .setAddingReplicas(util.List.of())
.setLeader(partition1Replicas.get(0))
.setLeaderEpoch(leaderEpoch)
.setPartitionEpoch(0)
@@ -3431,8 +3247,6 @@ class ReplicaManagerTest {
.setTopicId(topicIds.get(topic))
.setIsr(partition1Replicas)
.setReplicas(partition1Replicas)
- .setRemovingReplicas(util.List.of())
- .setAddingReplicas(util.List.of())
.setLeader(partition1Replicas.get(1))
.setLeaderEpoch(leaderEpoch + leaderEpochIncrement)
.setPartitionEpoch(0)
@@ -3479,8 +3293,6 @@ class ReplicaManagerTest {
.setTopicId(topicIds.get(topic))
.setIsr(partition1Replicas)
.setReplicas(partition1Replicas)
- .setRemovingReplicas(util.List.of())
- .setAddingReplicas(util.List.of())
.setLeader(partition1Replicas.get(0))
.setLeaderEpoch(leaderEpoch)
.setPartitionEpoch(0)
@@ -3496,8 +3308,6 @@ class ReplicaManagerTest {
.setTopicId(topicIds.get(topic))
.setIsr(partition1Replicas)
.setReplicas(partition1Replicas)
- .setRemovingReplicas(util.List.of())
- .setAddingReplicas(util.List.of())
.setLeader(partition1Replicas.get(1))
.setLeaderEpoch(leaderEpoch + leaderEpochIncrement)
.setPartitionEpoch(0)
@@ -4321,8 +4131,6 @@ class ReplicaManagerTest {
.setTopicId(topicId)
.setReplicas(effectiveReplicas)
.setIsr(effectiveIsr)
- .setRemovingReplicas(Collections.emptyList())
- .setAddingReplicas(Collections.emptyList())
.setLeader(leaderId)
.setLeaderEpoch(leaderEpoch)
.setPartitionEpoch(0)
@@ -4349,8 +4157,6 @@ class ReplicaManagerTest {
.setTopicId(topicId)
.setReplicas(util.Arrays.asList(followerId, leaderId))
.setIsr(util.Arrays.asList(followerId, leaderId))
- .setRemovingReplicas(Collections.emptyList())
- .setAddingReplicas(Collections.emptyList())
.setLeader(leaderId)
.setLeaderEpoch(leaderEpoch)
.setPartitionEpoch(0)
@@ -5539,13 +5345,13 @@ class ReplicaManagerTest {
}
}
- private def topicsCreateDelta(startId: Int, isStartIdLeader: Boolean,
partitions:List[Int] = List(0), directoryIds: List[Uuid] = List.empty,
topicName: String = "foo", topicId: Uuid = FOO_UUID): TopicsDelta = {
+ private def topicsCreateDelta(startId: Int, isStartIdLeader: Boolean,
partitions:List[Int] = List(0), directoryIds: List[Uuid] = List.empty,
topicName: String = "foo", topicId: Uuid = FOO_UUID, leaderEpoch: Int = 0):
TopicsDelta = {
val leader = if (isStartIdLeader) startId else startId + 1
val delta = new TopicsDelta(TopicsImage.EMPTY)
delta.replay(new TopicRecord().setName(topicName).setTopicId(topicId))
partitions.foreach { partition =>
- val record = partitionRecord(startId, leader, partition, topicId)
+ val record = partitionRecord(startId, leader, partition, topicId,
leaderEpoch)
if (directoryIds.nonEmpty) {
record.setDirectories(directoryIds.asJava)
}
@@ -5555,16 +5361,14 @@ class ReplicaManagerTest {
delta
}
- private def partitionRecord(startId: Int, leader: Int, partition: Int = 0,
topicId: Uuid = FOO_UUID) = {
+ private def partitionRecord(startId: Int, leader: Int, partition: Int = 0,
topicId: Uuid = FOO_UUID, leaderEpoch: Int = 0) = {
new PartitionRecord()
.setPartitionId(partition)
.setTopicId(topicId)
.setReplicas(util.Arrays.asList(startId, startId + 1))
.setIsr(util.Arrays.asList(startId, startId + 1))
- .setRemovingReplicas(Collections.emptyList())
- .setAddingReplicas(Collections.emptyList())
.setLeader(leader)
- .setLeaderEpoch(0)
+ .setLeaderEpoch(leaderEpoch)
.setPartitionEpoch(0)
}