chia7712 commented on code in PR #20082:
URL: https://github.com/apache/kafka/pull/20082#discussion_r2639707507


##########
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##########
@@ -5360,6 +5360,71 @@ class ReplicaManagerTest {
     assertEquals(expectedTopicId, fetchState.get.topicId)
   }
 
+  @Test
+  def testReplicaAlterLogDirsMultipleReassignmentDoesNotBlockLogCleaner(): 
Unit = {
+    val localId = 0
+    val tp = new TopicPartition(topic, 0)
+    val tpId = new TopicIdPartition(topicId, tp)
+
+    val props = TestUtils.createBrokerConfig(localId)
+    val path1 = TestUtils.tempRelativeDir("data").getAbsolutePath
+    val path2 = TestUtils.tempRelativeDir("data2").getAbsolutePath
+    val path3 = TestUtils.tempRelativeDir("data3").getAbsolutePath
+    props.put("log.dirs", Seq(path1, path2, path3).mkString(","))
+    val config = KafkaConfig.fromProps(props)
+    val mockLogMgr = TestUtils.createLogManager(config.logDirs.asScala.map(new 
File(_)), cleanerConfig = new CleanerConfig(true))
+    mockLogMgr.startup(Set())
+    val replicaManager = new ReplicaManager(
+      metrics = metrics,
+      config = config,
+      time = time,
+      scheduler = new MockScheduler(time),
+      logManager = mockLogMgr,
+      quotaManagers = quotaManager,
+      metadataCache = metadataCache,
+      logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size),
+      alterPartitionManager = alterPartitionManager,
+      addPartitionsToTxnManager = Some(addPartitionsToTxnManager))
+
+    try {
+      val spiedPartition = spy(Partition(tpId, time, replicaManager))
+      replicaManager.addOnlinePartition(tp, spiedPartition)
+
+      val leaderDelta = topicsCreateDelta(localId, isStartIdLeader = true, 
partitions = List(0, 1), List.empty, topic, topicIds(topic))
+      val leaderImage = imageFromTopics(leaderDelta.apply())
+      replicaManager.applyDelta(leaderDelta, leaderImage)
+
+      // Move the replica to the second log directory.
+      val partition = replicaManager.getPartitionOrException(tp)
+      val firstLogDir = partition.log.get.dir.getParentFile
+      val newReplicaFolder = replicaManager.logManager.liveLogDirs.filterNot(_ 
== firstLogDir).head
+      replicaManager.alterReplicaLogDirs(Map(tp -> 
newReplicaFolder.getAbsolutePath))
+
+      // Prevent promotion of future replica
+      
doReturn(false).when(spiedPartition).maybeReplaceCurrentWithFutureReplica()

Review Comment:
   hi all,
   
   ```
   java.lang.ClassCastException: class java.lang.Boolean cannot be cast to 
class org.apache.kafka.storage.internals.log.UnifiedLog (java.lang.Boolean is 
in module java.base of loader 'bootstrap'; 
org.apache.kafka.storage.internals.log.UnifiedLog is in unnamed module of 
loader 'app')
        at 
kafka.cluster.Partition.futureLocalLogOrException(Partition.scala:408)
        at 
kafka.server.ReplicaManager.futureLocalLogOrException(ReplicaManager.scala:587)
        at 
kafka.server.ReplicaManagerTest.testReplicaAlterLogDirsMultipleReassignmentDoesNotBlockLogCleaner(ReplicaManagerTest.scala:5534)
        at java.base/java.lang.reflect.Method.invoke(Method.java:580)
        at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
        at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
   ```
   
   The replica thread interfered with the `doReturn(false)` stubbing, causing 
Mockito to incorrectly apply the return value to the wrong method.
   
   You can reproduce the error with the following command
   ```
   N=100; I=0; while [ $I -lt $N ] && ./gradlew cleanTest core:test --tests 
ReplicaManagerTest -PmaxParallelForks=4 \
   ; do (( I=$I+1 )); echo "Completed run: $I"; sleep 1; done
   ```
   
   Will file a patch for it 😄 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to