This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 5e23df0c8da KAFKA-18486 Migrate tests to use applyDelta instead of
becomeLeaderOrFollower for testInconsistentIdReturnsError and others (#20014)
5e23df0c8da is described below
commit 5e23df0c8daf11f478f63c9f50aefa5b7e636d7f
Author: Jing-Jia Hung <[email protected]>
AuthorDate: Wed Jun 25 08:02:27 2025 -0400
KAFKA-18486 Migrate tests to use applyDelta instead of
becomeLeaderOrFollower for testInconsistentIdReturnsError and others (#20014)
continues the migration effort for KAFKA-18486 by replacing usage of the
deprecated `becomeLeaderOrFollower` API with `applyDelta` in several
test cases.
#### Updated tests:
- `testInconsistentIdReturnsError`
- `testMaybeAddLogDirFetchers`
- `testMaybeAddLogDirFetchersPausingCleaning`
- `testSuccessfulBuildRemoteLogAuxStateMetrics`
- `testVerificationForTransactionalPartitionsOnly`
- `testBecomeFollowerWhenLeaderIsUnchangedButMissedLeaderUpdate`
Reviewers: Jhen-Yung Hsu <[email protected]>, TaiJuWu
<[email protected]>, Ken Huang <[email protected]>, Chia-Ping Tsai
<[email protected]>
---
.../unit/kafka/server/ReplicaManagerTest.scala | 259 +++++++--------------
1 file changed, 78 insertions(+), 181 deletions(-)
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 8e7c6181dd3..12f7d23164a 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -55,7 +55,7 @@ import org.apache.kafka.common.utils.{LogContext, Time, Utils}
import org.apache.kafka.coordinator.transaction.{AddPartitionsToTxnConfig,
TransactionLogConfig}
import org.apache.kafka.image._
import org.apache.kafka.metadata.LeaderConstants.NO_LEADER
-import org.apache.kafka.metadata.{LeaderAndIsr, MetadataCache}
+import org.apache.kafka.metadata.MetadataCache
import org.apache.kafka.metadata.properties.{MetaProperties,
MetaPropertiesEnsemble, MetaPropertiesVersion, PropertiesUtils}
import org.apache.kafka.server.common.{DirectoryEventHandler, KRaftVersion,
MetadataVersion, OffsetAndEpoch, RequestLocal, StopPartition}
import org.apache.kafka.server.config.{KRaftConfigs, ReplicationConfigs,
ServerLogConfigs}
@@ -133,7 +133,6 @@ class ReplicaManagerTest {
// Constants defined for readability
private val zkVersion = 0
- private val correlationId = 0
private val controllerEpoch = 0
private val brokerEpoch = 0L
@@ -312,38 +311,26 @@ class ReplicaManagerTest {
alterPartitionManager = alterPartitionManager)
try {
- val partition = rm.createPartition(new TopicPartition(topic, 0))
- partition.createLogIfNotExists(isNew = false, isFutureReplica = false,
- new LazyOffsetCheckpoints(rm.highWatermarkCheckpoints.asJava), None)
+ val delta = topicsCreateDelta(0, isStartIdLeader = true, partitions =
List(0), topicName = topic, topicId = topicIds(topic))
+ val image = imageFromTopics(delta.apply())
+ rm.applyDelta(delta, image)
+ val partition = rm.getPartitionOrException(topicPartition)
- rm.becomeLeaderOrFollower(0, new LeaderAndIsrRequest.Builder(0, 0,
brokerEpoch,
- Seq(new LeaderAndIsrRequest.PartitionState()
- .setTopicName(topic)
- .setPartitionIndex(0)
- .setControllerEpoch(0)
- .setLeader(0)
- .setLeaderEpoch(0)
- .setIsr(Seq[Integer](0).asJava)
- .setPartitionEpoch(0)
- .setReplicas(Seq[Integer](0).asJava)
- .setIsNew(false)).asJava,
- Collections.singletonMap(topic, topicId),
- Set(new Node(0, "host1", 0)).asJava).build(), (_, _) => ())
- appendRecords(rm, new TopicPartition(topic, 0),
+ appendRecords(rm, topicPartition,
MemoryRecords.withRecords(Compression.NONE, new SimpleRecord("first
message".getBytes()), new SimpleRecord("second message".getBytes())))
- logManager.maybeUpdatePreferredLogDir(new TopicPartition(topic, 0),
dir2.getAbsolutePath)
+ logManager.maybeUpdatePreferredLogDir(topicPartition,
dir2.getAbsolutePath)
partition.createLogIfNotExists(isNew = true, isFutureReplica = true,
new LazyOffsetCheckpoints(rm.highWatermarkCheckpoints.asJava), None)
// this method should use hw of future log to create log dir fetcher.
Otherwise, it causes offset mismatch error
rm.maybeAddLogDirFetchers(Set(partition), new
LazyOffsetCheckpoints(rm.highWatermarkCheckpoints.asJava), _ => None)
- rm.replicaAlterLogDirsManager.fetcherThreadMap.values.foreach(t =>
t.fetchState(new TopicPartition(topic, 0)).foreach(s => assertEquals(0L,
s.fetchOffset)))
+ rm.replicaAlterLogDirsManager.fetcherThreadMap.values.foreach(t =>
t.fetchState(topicPartition).foreach(s => assertEquals(0L, s.fetchOffset)))
// make sure alter log dir thread has processed the data
rm.replicaAlterLogDirsManager.fetcherThreadMap.values.foreach(t =>
t.doWork())
assertEquals(Set.empty,
rm.replicaAlterLogDirsManager.failedPartitions.partitions())
// the future log becomes the current log, so the partition state should
get removed
- rm.replicaAlterLogDirsManager.fetcherThreadMap.values.foreach(t =>
assertEquals(None, t.fetchState(new TopicPartition(topic, 0))))
+ rm.replicaAlterLogDirsManager.fetcherThreadMap.values.foreach(t =>
assertEquals(None, t.fetchState(topicPartition)))
} finally {
rm.shutdown(checkpointHW = false)
}
@@ -362,7 +349,6 @@ class ReplicaManagerTest {
val metadataCache: MetadataCache = mock(classOf[MetadataCache])
mockGetAliveBrokerFunctions(metadataCache, Seq(new Node(0, "host0", 0)))
when(metadataCache.metadataVersion()).thenReturn(MetadataVersion.MINIMUM_VERSION)
- val tp0 = new TopicPartition(topic, 0)
val rm = new ReplicaManager(
metrics = metrics,
config = config,
@@ -375,28 +361,13 @@ class ReplicaManagerTest {
alterPartitionManager = alterPartitionManager)
try {
- val partition = rm.createPartition(tp0)
- partition.createLogIfNotExists(isNew = false, isFutureReplica = false,
- new LazyOffsetCheckpoints(rm.highWatermarkCheckpoints.asJava),
Option.apply(topicId))
-
- val response = rm.becomeLeaderOrFollower(0, new
LeaderAndIsrRequest.Builder(0, 0, brokerEpoch,
- Seq(new LeaderAndIsrRequest.PartitionState()
- .setTopicName(topic)
- .setPartitionIndex(0)
- .setControllerEpoch(0)
- .setLeader(0)
- .setLeaderEpoch(0)
- .setIsr(Seq[Integer](0).asJava)
- .setPartitionEpoch(0)
- .setReplicas(Seq[Integer](0).asJava)
- .setIsNew(false)).asJava,
- Collections.singletonMap(topic, topicId),
- Set(new Node(0, "host1", 0)).asJava).build(), (_, _) => ())
- // expect the errorCounts only has 1 entry with Errors.NONE
- val errorCounts = response.errorCounts()
- assertEquals(1, response.errorCounts().size())
- assertNotNull(errorCounts.get(Errors.NONE))
- spyLogManager.maybeUpdatePreferredLogDir(tp0, dir2.getAbsolutePath)
+ val delta = topicsCreateDelta(startId = 0, isStartIdLeader = true,
+ partitions = List(0), topicName = topic, topicId = topicId)
+ val image = imageFromTopics(delta.apply())
+ rm.applyDelta(delta, image)
+ val partition = rm.getPartitionOrException(topicPartition)
+
+ spyLogManager.maybeUpdatePreferredLogDir(topicPartition,
dir2.getAbsolutePath)
if (futureLogCreated) {
// create future log before maybeAddLogDirFetchers invoked
@@ -404,7 +375,7 @@ class ReplicaManagerTest {
new LazyOffsetCheckpoints(rm.highWatermarkCheckpoints.asJava), None)
} else {
val mockLog = mock(classOf[UnifiedLog])
- when(spyLogManager.getLog(tp0, isFuture =
true)).thenReturn(Option.apply(mockLog))
+ when(spyLogManager.getLog(topicPartition, isFuture =
true)).thenReturn(Option.apply(mockLog))
when(mockLog.topicId).thenReturn(Optional.of(topicId))
when(mockLog.parentDir).thenReturn(dir2.getAbsolutePath)
}
@@ -1225,65 +1196,51 @@ class ReplicaManagerTest {
}
}
- @Test
- def testBecomeFollowerWhenLeaderIsUnchangedButMissedLeaderUpdate(): Unit = {
- verifyBecomeFollowerWhenLeaderIsUnchangedButMissedLeaderUpdate(new
Properties, expectTruncation = false)
- }
-
/**
* If a partition becomes a follower and the leader is unchanged it should
check for truncation
* if the epoch has increased by more than one (which suggests it has missed
an update). For
* IBP version 2.7 onwards, we don't require this since we can truncate at
any time based
* on diverging epochs returned in fetch responses.
+ * This test assumes IBP >= 2.7 behavior, so `expectTruncation` is set to
false and truncation is not expected.
*/
- private def
verifyBecomeFollowerWhenLeaderIsUnchangedButMissedLeaderUpdate(extraProps:
Properties,
-
expectTruncation: Boolean): Unit = {
- val topicPartition = 0
+ @Test
+ def testBecomeFollowerWhenLeaderIsUnchangedButMissedLeaderUpdate(): Unit = {
+ val extraProps = new Properties
val followerBrokerId = 0
val leaderBrokerId = 1
- val controllerId = 0
- val controllerEpoch = 0
var leaderEpoch = 1
val leaderEpochIncrement = 2
- val aliveBrokerIds = Seq[Integer](followerBrokerId, leaderBrokerId)
val countDownLatch = new CountDownLatch(1)
val offsetFromLeader = 5
-
// Prepare the mocked components for the test
val (replicaManager, mockLogMgr) = prepareReplicaManagerAndLogManager(new
MockTimer(time),
- topicPartition, leaderEpoch + leaderEpochIncrement, followerBrokerId,
leaderBrokerId, countDownLatch,
- expectTruncation = expectTruncation, localLogOffset = Optional.of(10),
offsetFromLeader = offsetFromLeader, extraProps = extraProps, topicId =
Optional.of(topicId))
+ topicPartition.partition(), leaderEpoch + leaderEpochIncrement,
followerBrokerId, leaderBrokerId, countDownLatch,
+ expectTruncation = false, localLogOffset = Optional.of(10),
offsetFromLeader = offsetFromLeader, extraProps = extraProps, topicId =
Optional.of(topicId))
try {
// Initialize partition state to follower, with leader = 1, leaderEpoch
= 1
- val tp = new TopicPartition(topic, topicPartition)
- val partition = replicaManager.createPartition(tp)
+ val partition = replicaManager.createPartition(topicPartition)
val offsetCheckpoints = new
LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints.asJava)
partition.createLogIfNotExists(isNew = false, isFutureReplica = false,
offsetCheckpoints, None)
- partition.makeFollower(
- leaderAndIsrPartitionState(tp, leaderEpoch, leaderBrokerId,
aliveBrokerIds),
- offsetCheckpoints,
- None)
+ val followerDelta = topicsCreateDelta(startId = followerBrokerId,
isStartIdLeader = false, partitions = List(topicPartition.partition()),
List.empty, topic, topicIds(topic), leaderEpoch)
+ replicaManager.applyDelta(followerDelta,
imageFromTopics(followerDelta.apply()))
+
+ // Verify log created and partition is hosted
+ val localLog = replicaManager.localLog(topicPartition)
+ assertTrue(localLog.isDefined, "Log should be created for follower after
applyDelta")
+ val hostedPartition = replicaManager.getPartition(topicPartition)
+ assertTrue(hostedPartition.isInstanceOf[HostedPartition.Online])
// Make local partition a follower - because epoch increased by more
than 1, truncation should
// trigger even though leader does not change
leaderEpoch += leaderEpochIncrement
- val leaderAndIsrRequest0 = new LeaderAndIsrRequest.Builder(
- controllerId, controllerEpoch, brokerEpoch,
- Seq(leaderAndIsrPartitionState(tp, leaderEpoch, leaderBrokerId,
aliveBrokerIds)).asJava,
- Collections.singletonMap(topic, topicId),
- Set(new Node(followerBrokerId, "host1", 0),
- new Node(leaderBrokerId, "host2", 1)).asJava).build()
- replicaManager.becomeLeaderOrFollower(correlationId,
leaderAndIsrRequest0,
- (_, followers) => assertEquals(followerBrokerId,
followers.head.partitionId))
+ val epochJumpDelta = topicsCreateDelta(startId = followerBrokerId,
isStartIdLeader = false, partitions = List(topicPartition.partition()),
List.empty, topic, topicIds(topic), leaderEpoch)
+ replicaManager.applyDelta(epochJumpDelta,
imageFromTopics(epochJumpDelta.apply()))
+
assertTrue(countDownLatch.await(1000L, TimeUnit.MILLISECONDS))
- // Truncation should have happened once
- if (expectTruncation) {
- verify(mockLogMgr).truncateTo(Map(tp -> offsetFromLeader), isFuture =
false)
- }
- verify(mockLogMgr).finishedInitializingLog(ArgumentMatchers.eq(tp),
any())
+
verify(mockLogMgr).finishedInitializingLog(ArgumentMatchers.eq(topicPartition),
any())
} finally {
replicaManager.shutdown(checkpointHW = false)
}
@@ -1859,16 +1816,16 @@ class ReplicaManagerTest {
val producerEpoch = 0.toShort
val sequence = 0
val addPartitionsToTxnManager = mock(classOf[AddPartitionsToTxnManager])
-
val replicaManager =
setUpReplicaManagerWithMockedAddPartitionsToTxnManager(addPartitionsToTxnManager,
List(tp0, tp1))
+ val brokerList = Seq[Integer](0, 1).asJava
try {
- replicaManager.becomeLeaderOrFollower(1,
- makeLeaderAndIsrRequest(topicIds(tp0.topic), tp0, Seq(0, 1), new
LeaderAndIsr(1, List(0, 1).map(Int.box).asJava)),
- (_, _) => ())
+ val leaderDelta0 = createLeaderDelta(topicId, tp0, leaderId = 1,
replicas = brokerList, isr = brokerList)
+ val leaderDelta1 = createLeaderDelta(topicId, tp1, leaderId = 1,
replicas = brokerList, isr = brokerList)
+ val image0 = imageFromTopics(leaderDelta0.apply())
+ replicaManager.applyDelta(leaderDelta0, image0)
- replicaManager.becomeLeaderOrFollower(1,
- makeLeaderAndIsrRequest(topicIds(tp1.topic), tp1, Seq(0, 1), new
LeaderAndIsr(1, List(0, 1).map(Int.box).asJava)),
- (_, _) => ())
+ val image1 = imageFromTopics(leaderDelta1.apply())
+ replicaManager.applyDelta(leaderDelta1, image1)
// If we supply no transactional ID and idempotent records, we do not
verify.
val idempotentRecords =
MemoryRecords.withIdempotentRecords(Compression.NONE, producerId,
producerEpoch, sequence,
@@ -3651,8 +3608,6 @@ class ReplicaManagerTest {
@Test
def testSuccessfulBuildRemoteLogAuxStateMetrics(): Unit = {
- val tp0 = new TopicPartition(topic, 0)
-
val remoteLogManager = mock(classOf[RemoteLogManager])
val remoteLogSegmentMetadata = mock(classOf[RemoteLogSegmentMetadata])
when(remoteLogManager.fetchRemoteLogSegmentMetadata(any(), anyInt(),
anyLong())).thenReturn(
@@ -3664,40 +3619,25 @@ class ReplicaManagerTest {
val replicaManager = setupReplicaManagerWithMockedPurgatories(new
MockTimer(time), aliveBrokerIds = Seq(0, 1, 2), enableRemoteStorage = true,
shouldMockLog = true, remoteLogManager = Some(remoteLogManager),
buildRemoteLogAuxState = true)
try {
-
val offsetCheckpoints = new
LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints.asJava)
- replicaManager.createPartition(tp0).createLogIfNotExists(isNew = false,
isFutureReplica = false, offsetCheckpoints, None)
+
replicaManager.createPartition(topicPartition).createLogIfNotExists(isNew =
false, isFutureReplica = false, offsetCheckpoints, None)
val partition0Replicas = Seq[Integer](0, 1).asJava
- val topicIds = Map(tp0.topic -> topicId).asJava
- val leaderAndIsrRequest = new LeaderAndIsrRequest.Builder(0, 0,
brokerEpoch,
- Seq(
- new LeaderAndIsrRequest.PartitionState()
- .setTopicName(tp0.topic)
- .setPartitionIndex(tp0.partition)
- .setControllerEpoch(0)
- .setLeader(1)
- .setLeaderEpoch(0)
- .setIsr(partition0Replicas)
- .setPartitionEpoch(0)
- .setReplicas(partition0Replicas)
- .setIsNew(true)
- ).asJava,
- topicIds,
- Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
// Verify the metrics for build remote log state and for failures is
zero before replicas start to fetch
- assertEquals(0,
brokerTopicStats.topicStats(tp0.topic()).buildRemoteLogAuxStateRequestRate.count)
- assertEquals(0,
brokerTopicStats.topicStats(tp0.topic()).failedBuildRemoteLogAuxStateRate.count)
+ assertEquals(0,
brokerTopicStats.topicStats(topicPartition.topic()).buildRemoteLogAuxStateRequestRate.count)
+ assertEquals(0,
brokerTopicStats.topicStats(topicPartition.topic()).failedBuildRemoteLogAuxStateRate.count)
// Verify aggregate metrics
assertEquals(0,
brokerTopicStats.allTopicsStats.buildRemoteLogAuxStateRequestRate.count)
assertEquals(0,
brokerTopicStats.allTopicsStats.failedBuildRemoteLogAuxStateRate.count)
- replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest, (_, _) =>
())
+ val leaderDelta = createLeaderDelta(topicId, topicPartition, leaderId =
1, replicas = partition0Replicas, isr = partition0Replicas)
+ val leaderMetadataImage = imageFromTopics(leaderDelta.apply())
+ replicaManager.applyDelta(leaderDelta, leaderMetadataImage)
// Replicas fetch from the leader periodically, therefore we check that
the metric value is increasing
- waitUntilTrue(() =>
brokerTopicStats.topicStats(tp0.topic()).buildRemoteLogAuxStateRequestRate.count
> 0,
- "Should have buildRemoteLogAuxStateRequestRate count > 0, but got:" +
brokerTopicStats.topicStats(tp0.topic()).buildRemoteLogAuxStateRequestRate.count)
- assertEquals(0,
brokerTopicStats.topicStats(tp0.topic()).failedBuildRemoteLogAuxStateRate.count)
+ waitUntilTrue(() =>
brokerTopicStats.topicStats(topicPartition.topic()).buildRemoteLogAuxStateRequestRate.count
> 0,
+ "Should have buildRemoteLogAuxStateRequestRate count > 0, but got:" +
brokerTopicStats.topicStats(topicPartition.topic()).buildRemoteLogAuxStateRequestRate.count)
+ assertEquals(0,
brokerTopicStats.topicStats(topicPartition.topic()).failedBuildRemoteLogAuxStateRate.count)
// Verify aggregate metrics
waitUntilTrue(() =>
brokerTopicStats.allTopicsStats.buildRemoteLogAuxStateRequestRate.count > 0,
"Should have all topic buildRemoteLogAuxStateRequestRate count > 0,
but got:" +
brokerTopicStats.allTopicsStats.buildRemoteLogAuxStateRequestRate.count)
@@ -3881,41 +3821,35 @@ class ReplicaManagerTest {
def testInconsistentIdReturnsError(): Unit = {
val replicaManager = setupReplicaManagerWithMockedPurgatories(new
MockTimer(time))
try {
- val brokerList = Seq[Integer](0, 1).asJava
- val topicPartition = new TopicPartition(topic, 0)
- val topicIds = Collections.singletonMap(topic, Uuid.randomUuid())
- val topicNames = topicIds.asScala.map(_.swap).asJava
-
- val invalidTopicIds = Collections.singletonMap(topic, Uuid.randomUuid())
- val invalidTopicNames = invalidTopicIds.asScala.map(_.swap).asJava
-
- def leaderAndIsrRequest(epoch: Int, topicIds: java.util.Map[String,
Uuid]): LeaderAndIsrRequest =
- new LeaderAndIsrRequest.Builder(0, 0, brokerEpoch,
- Seq(new LeaderAndIsrRequest.PartitionState()
- .setTopicName(topic)
- .setPartitionIndex(0)
- .setControllerEpoch(0)
- .setLeader(0)
- .setLeaderEpoch(epoch)
- .setIsr(brokerList)
- .setPartitionEpoch(0)
- .setReplicas(brokerList)
- .setIsNew(true)).asJava,
- topicIds,
- Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
-
- val response = replicaManager.becomeLeaderOrFollower(0,
leaderAndIsrRequest(0, topicIds), (_, _) => ())
- assertEquals(Errors.NONE,
response.partitionErrors(topicNames).get(topicPartition))
-
- val response2 = replicaManager.becomeLeaderOrFollower(0,
leaderAndIsrRequest(1, topicIds), (_, _) => ())
- assertEquals(Errors.NONE,
response2.partitionErrors(topicNames).get(topicPartition))
+ val invalidTopicId = Uuid.randomUuid()
+
+ val initialDelta = topicsCreateDelta(0, isStartIdLeader = true,
+ partitions = List(0), topicName = topic, topicId = topicIds(topic))
+ val initialImage = imageFromTopics(initialDelta.apply())
+ replicaManager.applyDelta(initialDelta, initialImage)
+
+ val updateDelta = topicsCreateDelta(0, isStartIdLeader = true,
+ partitions = List(0), topicName = topic, topicId = topicIds(topic),
leaderEpoch = 1)
+ val updateImage = imageFromTopics(updateDelta.apply())
+ replicaManager.applyDelta(updateDelta, updateImage)
// Send request with inconsistent ID.
- val response3 = replicaManager.becomeLeaderOrFollower(0,
leaderAndIsrRequest(1, invalidTopicIds), (_, _) => ())
- assertEquals(Errors.INCONSISTENT_TOPIC_ID,
response3.partitionErrors(invalidTopicNames).get(topicPartition))
+ val inconsistentDelta1 = topicsCreateDelta(0, isStartIdLeader = true,
+ partitions = List(0), topicName = topic, topicId = invalidTopicId,
leaderEpoch = 1)
+ val inconsistentImage1 = imageFromTopics(inconsistentDelta1.apply())
+ val exception1 = assertThrows(classOf[IllegalStateException], () => {
+ replicaManager.applyDelta(inconsistentDelta1, inconsistentImage1)
+ })
+ assertEquals(s"Topic ${topic}-0 exists, but its ID is ${topicId}, not
${invalidTopicId} as expected", exception1.getMessage)
+
+ val inconsistentDelta2 = topicsCreateDelta(0, isStartIdLeader = true,
+ partitions = List(0), topicName = topic, topicId = invalidTopicId,
leaderEpoch = 2)
+ val inconsistentImage2 = imageFromTopics(inconsistentDelta2.apply())
+ val exception2 = assertThrows(classOf[IllegalStateException], () => {
+ replicaManager.applyDelta(inconsistentDelta2, inconsistentImage2)
+ })
+ assertEquals(s"Topic ${topic}-0 exists, but its ID is ${topicId}, not
${invalidTopicId} as expected", exception2.getMessage)
- val response4 = replicaManager.becomeLeaderOrFollower(0,
leaderAndIsrRequest(2, invalidTopicIds), (_, _) => ())
- assertEquals(Errors.INCONSISTENT_TOPIC_ID,
response4.partitionErrors(invalidTopicNames).get(topicPartition))
} finally {
replicaManager.shutdown(checkpointHW = false)
}
@@ -3984,43 +3918,6 @@ class ReplicaManagerTest {
}
}
- private def makeLeaderAndIsrRequest(
- topicId: Uuid,
- topicPartition: TopicPartition,
- replicas: Seq[Int],
- leaderAndIsr: LeaderAndIsr,
- isNew: Boolean = true,
- brokerEpoch: Int = 0,
- controllerId: Int = 0,
- controllerEpoch: Int = 0
- ): LeaderAndIsrRequest = {
- val partitionState = new LeaderAndIsrRequest.PartitionState()
- .setTopicName(topicPartition.topic)
- .setPartitionIndex(topicPartition.partition)
- .setControllerEpoch(controllerEpoch)
- .setLeader(leaderAndIsr.leader)
- .setLeaderEpoch(leaderAndIsr.leaderEpoch)
- .setIsr(leaderAndIsr.isr)
- .setPartitionEpoch(leaderAndIsr.partitionEpoch)
- .setReplicas(replicas.map(Int.box).asJava)
- .setIsNew(isNew)
-
- def mkNode(replicaId: Int): Node = {
- new Node(replicaId, s"host-$replicaId", 9092)
- }
-
- val nodes = Set(mkNode(controllerId)) ++ replicas.map(mkNode).toSet
-
- new LeaderAndIsrRequest.Builder(
- controllerId,
- controllerEpoch,
- brokerEpoch,
- Seq(partitionState).asJava,
- Map(topicPartition.topic -> topicId).asJava,
- nodes.asJava
- ).build()
- }
-
@Test
def testActiveProducerState(): Unit = {
val brokerId = 0