chia7712 commented on code in PR #15951:
URL: https://github.com/apache/kafka/pull/15951#discussion_r1602929057


##########
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##########
@@ -314,6 +314,77 @@ class ReplicaManagerTest {
     }
   }
 
+  @ParameterizedTest(name = "testMaybeAddLogDirFetchersPausingCleaning with 
futureLogCreated: {0}")
+  @ValueSource(booleans = Array(true, false))
+  def testMaybeAddLogDirFetchersPausingCleaning(futureLogCreated: Boolean): 
Unit = {
+    val dir1 = TestUtils.tempDir()
+    val dir2 = TestUtils.tempDir()
+    val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect)
+    props.put("log.dirs", dir1.getAbsolutePath + "," + dir2.getAbsolutePath)
+    val config = KafkaConfig.fromProps(props)
+    val logManager = TestUtils.createLogManager(config.logDirs.map(new 
File(_)), new LogConfig(new Properties()))
+    val spyLogManager = spy(logManager)
+    val metadataCache: MetadataCache = mock(classOf[MetadataCache])
+    mockGetAliveBrokerFunctions(metadataCache, Seq(new Node(0, "host0", 0)))
+    
when(metadataCache.metadataVersion()).thenReturn(config.interBrokerProtocolVersion)
+    val tp0 = new TopicPartition(topic, 0)
+    val uuid = Uuid.randomUuid()
+    val rm = new ReplicaManager(
+      metrics = metrics,
+      config = config,
+      time = time,
+      scheduler = new MockScheduler(time),
+      logManager = spyLogManager,
+      quotaManagers = quotaManager,
+      metadataCache = metadataCache,
+      logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size),
+      alterPartitionManager = alterPartitionManager)
+
+    try {
+      val partition = rm.createPartition(tp0)
+      partition.createLogIfNotExists(isNew = false, isFutureReplica = false,
+        new LazyOffsetCheckpoints(rm.highWatermarkCheckpoints), None)
+
+      rm.becomeLeaderOrFollower(0, new 
LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, 
brokerEpoch,
+        Seq(new LeaderAndIsrPartitionState()
+          .setTopicName(topic)
+          .setPartitionIndex(0)
+          .setControllerEpoch(0)
+          .setLeader(0)
+          .setLeaderEpoch(0)
+          .setIsr(Seq[Integer](0).asJava)
+          .setPartitionEpoch(0)
+          .setReplicas(Seq[Integer](0).asJava)
+          .setIsNew(false)).asJava,
+        Collections.singletonMap(topic, Uuid.randomUuid()),

Review Comment:
   My point was this test case will produce different topic ids. For example, 
the following assert will fail
   ```scala
         assertEquals(spyLogManager.getLog(tp0, isFuture = false).get.topicId,
           spyLogManager.getLog(tp0, isFuture = true).get.topicId)
   ```
   
   It seems `becomeLeaderOrFollower` set the topic id of "log", and the mock 
return a different topic id of future log. IIRC (and @showuon your confirm) 
they should have same topic ID, and so I'm a bit confused by this test case. 
   
   Please feel free to correct me if I misunderstand anything.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to