jolshan commented on code in PR #12543:
URL: https://github.com/apache/kafka/pull/12543#discussion_r950576126


##########
core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala:
##########
@@ -490,6 +490,41 @@ class ControllerIntegrationTest extends QuorumTestHarness {
       "failed to get expected partition state upon broker startup")
   }
 
+  @Test
+  def testAutoPreferredReplicaLeaderElectionWithReassigningPartitions(): Unit 
= {
+    servers = makeServers(3, autoLeaderRebalanceEnable = true)
+    val controllerId = TestUtils.waitUntilControllerElected(zkClient)
+    val leaderBrokerId = servers.map(_.config.brokerId).filter(_ != 
controllerId).head
+    val otherBrokerId = servers.map(_.config.brokerId).filter(e => e != 
controllerId && e != leaderBrokerId).head
+    val tp = new TopicPartition("t", 0)
+    val assignment = Map(tp.partition -> Seq(leaderBrokerId, controllerId))
+    TestUtils.createTopic(zkClient, tp.topic, partitionReplicaAssignment = 
assignment, servers = servers)
+    val reassigningTp = new TopicPartition("reassigning", 0)
+    val reassigningTpAssignment = Map(reassigningTp.partition -> 
Seq(controllerId))
+
+    TestUtils.createTopic(zkClient, reassigningTp.topic, 
partitionReplicaAssignment = reassigningTpAssignment, servers = servers)
+    servers(leaderBrokerId).shutdown()
+    servers(leaderBrokerId).awaitShutdown()
+
+    servers(otherBrokerId).shutdown()
+    servers(otherBrokerId).awaitShutdown()
+    waitForPartitionState(tp, firstControllerEpoch, controllerId, 
LeaderAndIsr.InitialLeaderEpoch + 1,
+      "failed to get expected partition state upon broker shutdown")
+
+    val reassignment = Map(reassigningTp -> 
ReplicaAssignment(Seq(otherBrokerId), List(), List()))
+    zkClient.createPartitionReassignment(reassignment.map { case (k, v) => k 
-> v.replicas })
+    waitForPartitionState(reassigningTp, firstControllerEpoch, controllerId, 
LeaderAndIsr.InitialLeaderEpoch + 1,
+      "failed to get expected partition state during partition reassignment 
with offline replica")
+
+    servers(leaderBrokerId).startup()
+    waitForPartitionState(tp, firstControllerEpoch, leaderBrokerId, 
LeaderAndIsr.InitialLeaderEpoch + 2,
+      "failed to get expected partition state upon broker startup")
+
+    servers(otherBrokerId).startup()
+    waitForPartitionState(reassigningTp, firstControllerEpoch, otherBrokerId, 
LeaderAndIsr.InitialLeaderEpoch + 4,

Review Comment:
   Is this leader epoch +4 because the startup and the reassignment?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to