showuon commented on code in PR #14051:
URL: https://github.com/apache/kafka/pull/14051#discussion_r1271666143


##########
core/src/test/scala/unit/kafka/cluster/PartitionTest.scala:
##########
@@ -1290,6 +1290,56 @@ class PartitionTest extends AbstractPartitionTest {
     )
   }
 
+  @Test
+  def testIsReplicaIsrEligibleWithEmptyReplicaMap(): Unit = {
+    val mockMetadataCache = mock(classOf[KRaftMetadataCache])
+    val partition = spy(new Partition(topicPartition,
+      replicaLagTimeMaxMs = Defaults.ReplicaLagTimeMaxMs,
+      interBrokerProtocolVersion = interBrokerProtocolVersion,
+      localBrokerId = brokerId,
+      () => defaultBrokerEpoch(brokerId),
+      time,
+      alterPartitionListener,
+      delayedOperations,
+      mockMetadataCache,
+      logManager,
+      alterPartitionManager))
+
+    when(offsetCheckpoints.fetch(ArgumentMatchers.anyString, 
ArgumentMatchers.eq(topicPartition)))
+      .thenReturn(None)
+    val log = logManager.getOrCreateLog(topicPartition, topicId = None)
+    seedLogData(log, numRecords = 6, leaderEpoch = 4)
+
+    val controllerEpoch = 0
+    val leaderEpoch = 5
+    val remoteBrokerId = brokerId + 1
+    val replicas = List[Integer](brokerId, remoteBrokerId).asJava
+
+    partition.createLogIfNotExists(isNew = false, isFutureReplica = false, 
offsetCheckpoints, None)
+
+    val initializeTimeMs = time.milliseconds()
+    assertTrue(partition.makeLeader(
+      new LeaderAndIsrPartitionState()
+        .setControllerEpoch(controllerEpoch)
+        .setLeader(brokerId)
+        .setLeaderEpoch(leaderEpoch)
+        .setIsr(List[Integer](brokerId).asJava)
+        .setPartitionEpoch(1)
+        .setReplicas(replicas)
+        .setIsNew(true),
+      offsetCheckpoints, None), "Expected become leader transition to succeed")
+
+    doAnswer(_ => {
+      // simulate topic is deleted at the moment
+      partition.delete()
+      val replica = new Replica(remoteBrokerId, topicPartition)
+      partition.updateFollowerFetchState(replica, 
mock(classOf[LogOffsetMetadata]), 0, initializeTimeMs, 0, 0)
+      mock(classOf[LogReadInfo])
+    }).when(partition).fetchRecords(any(), any(), anyLong(), anyInt(), 
anyBoolean(), anyBoolean())
+
+    fetchFollower(partition, replicaId = remoteBrokerId, fetchOffset = 3L)

Review Comment:
   nit: `assertDoesNotThrow(() => fetchFollower...)` should make it much clear.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to