chia7712 commented on code in PR #20082:
URL: https://github.com/apache/kafka/pull/20082#discussion_r2639707507
##########
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##########
@@ -5360,6 +5360,71 @@ class ReplicaManagerTest {
assertEquals(expectedTopicId, fetchState.get.topicId)
}
+ @Test
+ def testReplicaAlterLogDirsMultipleReassignmentDoesNotBlockLogCleaner():
Unit = {
+ val localId = 0
+ val tp = new TopicPartition(topic, 0)
+ val tpId = new TopicIdPartition(topicId, tp)
+
+ val props = TestUtils.createBrokerConfig(localId)
+ val path1 = TestUtils.tempRelativeDir("data").getAbsolutePath
+ val path2 = TestUtils.tempRelativeDir("data2").getAbsolutePath
+ val path3 = TestUtils.tempRelativeDir("data3").getAbsolutePath
+ props.put("log.dirs", Seq(path1, path2, path3).mkString(","))
+ val config = KafkaConfig.fromProps(props)
+ val mockLogMgr = TestUtils.createLogManager(config.logDirs.asScala.map(new
File(_)), cleanerConfig = new CleanerConfig(true))
+ mockLogMgr.startup(Set())
+ val replicaManager = new ReplicaManager(
+ metrics = metrics,
+ config = config,
+ time = time,
+ scheduler = new MockScheduler(time),
+ logManager = mockLogMgr,
+ quotaManagers = quotaManager,
+ metadataCache = metadataCache,
+ logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size),
+ alterPartitionManager = alterPartitionManager,
+ addPartitionsToTxnManager = Some(addPartitionsToTxnManager))
+
+ try {
+ val spiedPartition = spy(Partition(tpId, time, replicaManager))
+ replicaManager.addOnlinePartition(tp, spiedPartition)
+
+ val leaderDelta = topicsCreateDelta(localId, isStartIdLeader = true,
partitions = List(0, 1), List.empty, topic, topicIds(topic))
+ val leaderImage = imageFromTopics(leaderDelta.apply())
+ replicaManager.applyDelta(leaderDelta, leaderImage)
+
+ // Move the replica to the second log directory.
+ val partition = replicaManager.getPartitionOrException(tp)
+ val firstLogDir = partition.log.get.dir.getParentFile
+ val newReplicaFolder = replicaManager.logManager.liveLogDirs.filterNot(_
== firstLogDir).head
+ replicaManager.alterReplicaLogDirs(Map(tp ->
newReplicaFolder.getAbsolutePath))
+
+ // Prevent promotion of future replica
+
doReturn(false).when(spiedPartition).maybeReplaceCurrentWithFutureReplica()
Review Comment:
hi all,
```
java.lang.ClassCastException: class java.lang.Boolean cannot be cast to
class org.apache.kafka.storage.internals.log.UnifiedLog (java.lang.Boolean is
in module java.base of loader 'bootstrap';
org.apache.kafka.storage.internals.log.UnifiedLog is in unnamed module of
loader 'app')
at
kafka.cluster.Partition.futureLocalLogOrException(Partition.scala:408)
at
kafka.server.ReplicaManager.futureLocalLogOrException(ReplicaManager.scala:587)
at
kafka.server.ReplicaManagerTest.testReplicaAlterLogDirsMultipleReassignmentDoesNotBlockLogCleaner(ReplicaManagerTest.scala:5534)
at java.base/java.lang.reflect.Method.invoke(Method.java:580)
at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
```
The replica thread interfered with the `doReturn(false)` stubbing, causing
Mockito to incorrectly apply the return value to the wrong method.
You can reproduce the error with the following command
```
N=100; I=0; while [ $I -lt $N ] && ./gradlew cleanTest core:test --tests
ReplicaManagerTest -PmaxParallelForks=4 \
; do (( I=$I+1 )); echo "Completed run: $I"; sleep 1; done
```
Will file a patch for it 😄
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]