This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 991a10c19fd KAFKA-18486 Migrate
ReplicaManagerTest#testOffsetOutOfRangeExceptionWhenFetchMessages to use
applyDelta (#19952)
991a10c19fd is described below
commit 991a10c19fde604f734c83ed7f52c5b75633b05f
Author: PoAn Yang <[email protected]>
AuthorDate: Fri Jun 13 15:47:51 2025 +0800
KAFKA-18486 Migrate
ReplicaManagerTest#testOffsetOutOfRangeExceptionWhenFetchMessages to use
applyDelta (#19952)
Change `becomeLeaderOrFollower` to `applyDelta` in following test cases
* testOffsetOutOfRangeExceptionWhenFetchMessages
* testOffsetOutOfRangeExceptionWhenReadFromLog
* testOldFollowerLosesMetricsWhenReassignPartitions
* testOldLeaderLosesMetricsWhenReassignPartitions
Reviewers: Bolin Lin <[email protected]>, Lan Ding
<[email protected]>, Ken Huang <[email protected]>, Chia-Ping Tsai
<[email protected]>
---
.../unit/kafka/server/ReplicaManagerTest.scala | 223 +++++++--------------
1 file changed, 69 insertions(+), 154 deletions(-)
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 0499dd2f6f2..1cd340aa329 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -3418,11 +3418,8 @@ class ReplicaManagerTest {
@Test
def testOldLeaderLosesMetricsWhenReassignPartitions(): Unit = {
- val controllerEpoch = 0
val leaderEpoch = 0
val leaderEpochIncrement = 1
- val correlationId = 0
- val controllerId = 0
val mockTopicStats1: BrokerTopicStats = mock(classOf[BrokerTopicStats])
val (rm0, rm1) =
prepareDifferentReplicaManagers(mock(classOf[BrokerTopicStats]),
mockTopicStats1)
@@ -3435,64 +3432,38 @@ class ReplicaManagerTest {
val partition1Replicas = Seq[Integer](1, 0).asJava
val topicIds = Map(tp0.topic -> Uuid.randomUuid(), tp1.topic ->
Uuid.randomUuid()).asJava
- val leaderAndIsrRequest1 = new LeaderAndIsrRequest.Builder(controllerId,
0, brokerEpoch,
- Seq(
- new LeaderAndIsrRequest.PartitionState()
- .setTopicName(tp0.topic)
- .setPartitionIndex(tp0.partition)
- .setControllerEpoch(controllerEpoch)
- .setLeader(0)
- .setLeaderEpoch(leaderEpoch)
- .setIsr(partition0Replicas)
- .setPartitionEpoch(0)
- .setReplicas(partition0Replicas)
- .setIsNew(true),
- new LeaderAndIsrRequest.PartitionState()
- .setTopicName(tp1.topic)
- .setPartitionIndex(tp1.partition)
- .setControllerEpoch(controllerEpoch)
- .setLeader(1)
- .setLeaderEpoch(leaderEpoch)
- .setIsr(partition1Replicas)
- .setPartitionEpoch(0)
- .setReplicas(partition1Replicas)
- .setIsNew(true)
- ).asJava,
- topicIds,
- Set(new Node(0, "host0", 0), new Node(1, "host1", 1)).asJava).build()
-
- rm0.becomeLeaderOrFollower(correlationId, leaderAndIsrRequest1, (_, _)
=> ())
- rm1.becomeLeaderOrFollower(correlationId, leaderAndIsrRequest1, (_, _)
=> ())
+ val delta1 = createLeaderDelta(topicIds.get(topic), tp0,
partition0Replicas.get(0), partition0Replicas, partition0Replicas, leaderEpoch)
+ delta1.replay(new PartitionRecord()
+ .setPartitionId(tp1.partition)
+ .setTopicId(topicIds.get(topic))
+ .setIsr(partition1Replicas)
+ .setReplicas(partition1Replicas)
+ .setRemovingReplicas(util.List.of())
+ .setAddingReplicas(util.List.of())
+ .setLeader(partition1Replicas.get(0))
+ .setLeaderEpoch(leaderEpoch)
+ .setPartitionEpoch(0)
+ )
+ val leaderMetadataImage1 = imageFromTopics(delta1.apply())
+ rm0.applyDelta(delta1, leaderMetadataImage1)
+ rm1.applyDelta(delta1, leaderMetadataImage1)
// make broker 0 the leader of partition 1 so broker 1 loses its
leadership position
- val leaderAndIsrRequest2 = new LeaderAndIsrRequest.Builder(
controllerId, controllerEpoch, brokerEpoch,
- Seq(
- new LeaderAndIsrRequest.PartitionState()
- .setTopicName(tp0.topic)
- .setPartitionIndex(tp0.partition)
- .setControllerEpoch(controllerEpoch)
- .setLeader(0)
- .setLeaderEpoch(leaderEpoch + leaderEpochIncrement)
- .setIsr(partition0Replicas)
- .setPartitionEpoch(0)
- .setReplicas(partition0Replicas)
- .setIsNew(true),
- new LeaderAndIsrRequest.PartitionState()
- .setTopicName(tp1.topic)
- .setPartitionIndex(tp1.partition)
- .setControllerEpoch(controllerEpoch)
- .setLeader(0)
- .setLeaderEpoch(leaderEpoch + leaderEpochIncrement)
- .setIsr(partition1Replicas)
- .setPartitionEpoch(0)
- .setReplicas(partition1Replicas)
- .setIsNew(true)
- ).asJava,
- topicIds,
- Set(new Node(0, "host0", 0), new Node(1, "host1", 1)).asJava).build()
-
- rm0.becomeLeaderOrFollower(correlationId, leaderAndIsrRequest2, (_, _)
=> ())
- rm1.becomeLeaderOrFollower(correlationId, leaderAndIsrRequest2, (_, _)
=> ())
+ val delta2 = createLeaderDelta(topicIds.get(topic), tp0,
partition0Replicas.get(0), partition0Replicas, partition0Replicas, leaderEpoch
+ leaderEpochIncrement)
+ delta2.replay(new PartitionRecord()
+ .setPartitionId(tp1.partition)
+ .setTopicId(topicIds.get(topic))
+ .setIsr(partition1Replicas)
+ .setReplicas(partition1Replicas)
+ .setRemovingReplicas(util.List.of())
+ .setAddingReplicas(util.List.of())
+ .setLeader(partition1Replicas.get(1))
+ .setLeaderEpoch(leaderEpoch + leaderEpochIncrement)
+ .setPartitionEpoch(0)
+ )
+ val leaderMetadataImage2 = imageFromTopics(delta2.apply())
+ rm0.applyDelta(delta2, leaderMetadataImage2)
+ rm1.applyDelta(delta2, leaderMetadataImage2)
} finally {
Utils.tryAll(util.Arrays.asList[Callable[Void]](
() => {
@@ -3512,11 +3483,8 @@ class ReplicaManagerTest {
@Test
def testOldFollowerLosesMetricsWhenReassignPartitions(): Unit = {
- val controllerEpoch = 0
val leaderEpoch = 0
val leaderEpochIncrement = 1
- val correlationId = 0
- val controllerId = 0
val mockTopicStats1: BrokerTopicStats = mock(classOf[BrokerTopicStats])
val (rm0, rm1) =
prepareDifferentReplicaManagers(mock(classOf[BrokerTopicStats]),
mockTopicStats1)
@@ -3529,65 +3497,38 @@ class ReplicaManagerTest {
val partition1Replicas = Seq[Integer](1, 0).asJava
val topicIds = Map(tp0.topic -> Uuid.randomUuid(), tp1.topic ->
Uuid.randomUuid()).asJava
- val leaderAndIsrRequest1 = new LeaderAndIsrRequest.Builder(controllerId,
0, brokerEpoch,
- Seq(
- new LeaderAndIsrRequest.PartitionState()
- .setTopicName(tp0.topic)
- .setPartitionIndex(tp0.partition)
- .setControllerEpoch(controllerEpoch)
- .setLeader(1)
- .setLeaderEpoch(leaderEpoch)
- .setIsr(partition0Replicas)
- .setPartitionEpoch(0)
- .setReplicas(partition0Replicas)
- .setIsNew(true),
- new LeaderAndIsrRequest.PartitionState()
- .setTopicName(tp1.topic)
- .setPartitionIndex(tp1.partition)
- .setControllerEpoch(controllerEpoch)
- .setLeader(1)
- .setLeaderEpoch(leaderEpoch)
- .setIsr(partition1Replicas)
- .setPartitionEpoch(0)
- .setReplicas(partition1Replicas)
- .setIsNew(true)
- ).asJava,
- topicIds,
- Set(new Node(0, "host0", 0), new Node(1, "host1", 1)).asJava).build()
-
- rm0.becomeLeaderOrFollower(correlationId, leaderAndIsrRequest1, (_, _)
=> ())
- rm1.becomeLeaderOrFollower(correlationId, leaderAndIsrRequest1, (_, _)
=> ())
+ val delta = createLeaderDelta(topicIds.get(topic), tp0,
partition0Replicas.get(0), partition0Replicas, partition0Replicas, leaderEpoch)
+ delta.replay(new PartitionRecord()
+ .setPartitionId(tp1.partition)
+ .setTopicId(topicIds.get(topic))
+ .setIsr(partition1Replicas)
+ .setReplicas(partition1Replicas)
+ .setRemovingReplicas(util.List.of())
+ .setAddingReplicas(util.List.of())
+ .setLeader(partition1Replicas.get(0))
+ .setLeaderEpoch(leaderEpoch)
+ .setPartitionEpoch(0)
+ )
+ val leaderMetadataImage = imageFromTopics(delta.apply())
+ rm0.applyDelta(delta, leaderMetadataImage)
+ rm1.applyDelta(delta, leaderMetadataImage)
// make broker 0 the leader of partition 1 so broker 1 loses its
leadership position
- val leaderAndIsrRequest2 = new LeaderAndIsrRequest.Builder(controllerId,
- controllerEpoch, brokerEpoch,
- Seq(
- new LeaderAndIsrRequest.PartitionState()
- .setTopicName(tp0.topic)
- .setPartitionIndex(tp0.partition)
- .setControllerEpoch(controllerEpoch)
- .setLeader(0)
- .setLeaderEpoch(leaderEpoch + leaderEpochIncrement)
- .setIsr(partition0Replicas)
- .setPartitionEpoch(0)
- .setReplicas(partition0Replicas)
- .setIsNew(true),
- new LeaderAndIsrRequest.PartitionState()
- .setTopicName(tp1.topic)
- .setPartitionIndex(tp1.partition)
- .setControllerEpoch(controllerEpoch)
- .setLeader(0)
- .setLeaderEpoch(leaderEpoch + leaderEpochIncrement)
- .setIsr(partition1Replicas)
- .setPartitionEpoch(0)
- .setReplicas(partition1Replicas)
- .setIsNew(true)
- ).asJava,
- topicIds,
- Set(new Node(0, "host0", 0), new Node(1, "host1", 1)).asJava).build()
-
- rm0.becomeLeaderOrFollower(correlationId, leaderAndIsrRequest2, (_, _)
=> ())
- rm1.becomeLeaderOrFollower(correlationId, leaderAndIsrRequest2, (_, _)
=> ())
+ val delta2 = createLeaderDelta(topicIds.get(topic), tp0,
partition0Replicas.get(1), partition0Replicas, partition0Replicas, leaderEpoch
+ leaderEpochIncrement)
+ delta2.replay(new PartitionRecord()
+ .setPartitionId(tp1.partition)
+ .setTopicId(topicIds.get(topic))
+ .setIsr(partition1Replicas)
+ .setReplicas(partition1Replicas)
+ .setRemovingReplicas(util.List.of())
+ .setAddingReplicas(util.List.of())
+ .setLeader(partition1Replicas.get(1))
+ .setLeaderEpoch(leaderEpoch + leaderEpochIncrement)
+ .setPartitionEpoch(0)
+ )
+ val leaderMetadataImage2 = imageFromTopics(delta2.apply())
+ rm0.applyDelta(delta2, leaderMetadataImage2)
+ rm1.applyDelta(delta2, leaderMetadataImage2)
} finally {
Utils.tryAll(util.Arrays.asList[Callable[Void]](
() => {
@@ -3603,7 +3544,6 @@ class ReplicaManagerTest {
// verify that broker 1 did remove its metrics when no longer being the
leader of partition 1
verify(mockTopicStats1).removeOldLeaderMetrics(topic)
- verify(mockTopicStats1).removeOldFollowerMetrics(topic)
}
private def prepareDifferentReplicaManagers(brokerTopicStats1:
BrokerTopicStats,
@@ -3669,22 +3609,9 @@ class ReplicaManagerTest {
val partition0Replicas = Seq[Integer](0, 1).asJava
val topicIds = Map(tp0.topic -> topicId).asJava
val leaderEpoch = 0
- val leaderAndIsrRequest = new LeaderAndIsrRequest.Builder(0, 0,
brokerEpoch,
- Seq(
- new LeaderAndIsrRequest.PartitionState()
- .setTopicName(tp0.topic)
- .setPartitionIndex(tp0.partition)
- .setControllerEpoch(0)
- .setLeader(leaderEpoch)
- .setLeaderEpoch(0)
- .setIsr(partition0Replicas)
- .setPartitionEpoch(0)
- .setReplicas(partition0Replicas)
- .setIsNew(true)
- ).asJava,
- topicIds,
- Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
- replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest, (_, _) =>
())
+ val delta = createLeaderDelta(topicIds.get(topic), tp0,
partition0Replicas.get(0), partition0Replicas, partition0Replicas)
+ val leaderMetadataImage = imageFromTopics(delta.apply())
+ replicaManager.applyDelta(delta, leaderMetadataImage)
val params = new FetchParams(replicaId, 1, 1000, 0, 100,
FetchIsolation.LOG_END, Optional.empty)
// when reading log, it'll throw OffsetOutOfRangeException, which will
be handled separately
@@ -3713,6 +3640,7 @@ class ReplicaManagerTest {
@ParameterizedTest
@ValueSource(booleans = Array(true, false))
def testOffsetOutOfRangeExceptionWhenFetchMessages(isFromFollower: Boolean):
Unit = {
+ val brokerList = Seq[Integer](0, 1).asJava
val replicaId = if (isFromFollower) 1 else -1
val tp0 = new TopicPartition(topic, 0)
val tidp0 = new TopicIdPartition(topicId, tp0)
@@ -3721,25 +3649,11 @@ class ReplicaManagerTest {
try {
val offsetCheckpoints = new
LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints.asJava)
replicaManager.createPartition(tp0).createLogIfNotExists(isNew = false,
isFutureReplica = false, offsetCheckpoints, None)
- val partition0Replicas = Seq[Integer](0, 1).asJava
val topicIds = Map(tp0.topic -> topicId).asJava
val leaderEpoch = 0
- val leaderAndIsrRequest = new LeaderAndIsrRequest.Builder(0, 0,
brokerEpoch,
- Seq(
- new LeaderAndIsrRequest.PartitionState()
- .setTopicName(tp0.topic)
- .setPartitionIndex(tp0.partition)
- .setControllerEpoch(0)
- .setLeader(leaderEpoch)
- .setLeaderEpoch(0)
- .setIsr(partition0Replicas)
- .setPartitionEpoch(0)
- .setReplicas(partition0Replicas)
- .setIsNew(true)
- ).asJava,
- topicIds,
- Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
- replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest, (_, _) =>
())
+ val delta = createLeaderDelta(topicIds.get(topic), tp0,
brokerList.get(0), brokerList, brokerList)
+ val leaderMetadataImage = imageFromTopics(delta.apply())
+ replicaManager.applyDelta(delta, leaderMetadataImage)
val params = new FetchParams(replicaId, 1, 1000, 10, 100,
FetchIsolation.LOG_END, Optional.empty)
val fetchOffset = 1
@@ -4087,7 +4001,8 @@ class ReplicaManagerTest {
assertEquals(0,
brokerTopicStats.allTopicsStats.buildRemoteLogAuxStateRequestRate.count)
assertEquals(0,
brokerTopicStats.allTopicsStats.failedBuildRemoteLogAuxStateRate.count)
- val delta = createLeaderDelta(topicIds(topic), new TopicPartition(topic,
0), 1, util.List.of(0, 1), util.List.of(0, 1))
+ val brokerList = Seq[Integer](0, 1).asJava
+ val delta = createLeaderDelta(topicIds(topic), new TopicPartition(topic,
0), brokerList.get(1), brokerList, brokerList)
val leaderMetadataImage = imageFromTopics(delta.apply())
replicaManager.applyDelta(delta, leaderMetadataImage)