This is an automated email from the ASF dual-hosted git repository.

junrao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 12761c07aea KAFKA-19458: resume cleaning on future replica dir change 
(#20082)
12761c07aea is described below

commit 12761c07aead9743e7cfb036d1059c8a773ad106
Author: Gaurav Narula <[email protected]>
AuthorDate: Thu Jul 17 21:13:09 2025 +0100

    KAFKA-19458: resume cleaning on future replica dir change (#20082)
    
    `ReplicaManager#alterReplicaLogDirs` does not resume log cleaner while
    handling an `AlterReplicaLogDirs` request for a topic partition which
    already has an `AlterReplicaLogDirs` in progress, leading to a resource
    leak where the cleaning for topic partitions remains paused even after
    the log directory has been altered.
    
    This change ensures we invoke `LogManager#resumeCleaning` if the future
    replica directory has changed.
    
    Reviewers: Jun Rao <[email protected]>
---
 .../kafka/server/builders/LogManagerBuilder.java   |  5 +-
 core/src/main/scala/kafka/log/LogManager.scala     |  9 ++-
 .../main/scala/kafka/server/ReplicaManager.scala   | 15 ++++-
 .../unit/kafka/server/ReplicaManagerTest.scala     | 67 +++++++++++++++++++++-
 .../test/scala/unit/kafka/utils/TestUtils.scala    |  6 +-
 5 files changed, 94 insertions(+), 8 deletions(-)

diff --git a/core/src/main/java/kafka/server/builders/LogManagerBuilder.java 
b/core/src/main/java/kafka/server/builders/LogManagerBuilder.java
index 71d192dc5ea..6de61915e8e 100644
--- a/core/src/main/java/kafka/server/builders/LogManagerBuilder.java
+++ b/core/src/main/java/kafka/server/builders/LogManagerBuilder.java
@@ -24,6 +24,7 @@ import org.apache.kafka.metadata.ConfigRepository;
 import org.apache.kafka.server.config.ServerLogConfigs;
 import org.apache.kafka.server.util.Scheduler;
 import org.apache.kafka.storage.internals.log.CleanerConfig;
+import org.apache.kafka.storage.internals.log.LogCleaner;
 import org.apache.kafka.storage.internals.log.LogConfig;
 import org.apache.kafka.storage.internals.log.LogDirFailureChannel;
 import org.apache.kafka.storage.internals.log.ProducerStateManagerConfig;
@@ -172,6 +173,8 @@ public class LogManagerBuilder {
                               logDirFailureChannel,
                               time,
                               remoteStorageSystemEnable,
-                              initialTaskDelayMs);
+                              initialTaskDelayMs,
+                              LogCleaner::new
+                );
     }
 }
diff --git a/core/src/main/scala/kafka/log/LogManager.scala 
b/core/src/main/scala/kafka/log/LogManager.scala
index d9da339e215..d3f64793685 100755
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -77,7 +77,10 @@ class LogManager(logDirs: Seq[File],
                  logDirFailureChannel: LogDirFailureChannel,
                  time: Time,
                  remoteStorageSystemEnable: Boolean,
-                 val initialTaskDelayMs: Long) extends Logging {
+                 val initialTaskDelayMs: Long,
+                 cleanerFactory: (CleanerConfig, util.List[File], 
ConcurrentMap[TopicPartition, UnifiedLog], LogDirFailureChannel, Time) => 
LogCleaner =
+                  (cleanerConfig, files, map, logDirFailureChannel, time) => 
new LogCleaner(cleanerConfig, files, map, logDirFailureChannel, time)
+                ) extends Logging {
 
   private val metricsGroup = new KafkaMetricsGroup(this.getClass)
 
@@ -627,7 +630,7 @@ class LogManager(logDirs: Seq[File],
                          initialTaskDelayMs)
     }
     if (cleanerConfig.enableCleaner) {
-      _cleaner = new LogCleaner(cleanerConfig, liveLogDirs.asJava, 
currentLogs, logDirFailureChannel, time)
+      _cleaner = cleanerFactory(cleanerConfig, liveLogDirs.asJava, 
currentLogs, logDirFailureChannel, time)
       _cleaner.startup()
     } else {
       warn("The config `log.cleaner.enable` is deprecated and will be removed 
in Kafka 5.0. Starting from Kafka 5.0, the log cleaner will always be enabled, 
and this config will be ignored.")
@@ -893,7 +896,7 @@ class LogManager(logDirs: Seq[File],
   /**
    * Resume cleaning of the provided partition and log a message about it.
    */
-  private def resumeCleaning(topicPartition: TopicPartition): Unit = {
+  def resumeCleaning(topicPartition: TopicPartition): Unit = {
     if (cleaner != null) {
       cleaner.resumeCleaning(util.Set.of(topicPartition))
       info(s"Cleaning for partition $topicPartition is resumed")
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala 
b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 448ec1cf264..8fa705ef8c4 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -501,10 +501,15 @@ class ReplicaManager(val config: KafkaConfig,
   // Visible for testing
   def createPartition(topicPartition: TopicPartition): Partition = {
     val partition = Partition(topicPartition, time, this)
-    allPartitions.put(topicPartition, HostedPartition.Online(partition))
+    addOnlinePartition(topicPartition, partition)
     partition
   }
 
+  // Visible for testing
+  private[server] def addOnlinePartition(topicPartition: TopicPartition, 
partition: Partition): Unit = {
+    allPartitions.put(topicPartition, HostedPartition.Online(partition))
+  }
+
   def onlinePartition(topicPartition: TopicPartition): Option[Partition] = {
     getPartition(topicPartition) match {
       case HostedPartition.Online(partition) => Some(partition)
@@ -1149,7 +1154,15 @@ class ReplicaManager(val config: KafkaConfig,
               // Stop current replica movement if the destinationDir is 
different from the existing destination log directory
               if (partition.futureReplicaDirChanged(destinationDir)) {
                 
replicaAlterLogDirsManager.removeFetcherForPartitions(Set(topicPartition))
+                // There's a chance that the future replica can be promoted 
between the check for futureReplicaDirChanged
+                // and call to removeFetcherForPartitions. We want to avoid 
resuming cleaning again in that case to avoid
+                // an IllegalStateException. The presence of a future log 
after the call to removeFetcherForPartitions
+                // implies that it has not been promoted as both synchronize 
on partitionMapLock.
+                val futureReplicaPromoted = partition.futureLog.isEmpty
                 partition.removeFutureLocalReplica()
+                if (!futureReplicaPromoted) {
+                  logManager.resumeCleaning(topicPartition)
+                }
               }
             case HostedPartition.Offline(_) =>
               throw new KafkaStorageException(s"Partition $topicPartition is 
offline")
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala 
b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 88e0ae5d35d..623c282185b 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -77,7 +77,7 @@ import org.apache.kafka.server.util.timer.MockTimer
 import org.apache.kafka.server.util.{MockScheduler, MockTime, Scheduler}
 import org.apache.kafka.storage.internals.checkpoint.LazyOffsetCheckpoints
 import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache
-import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchDataInfo, 
LocalLog, LogAppendInfo, LogConfig, LogDirFailureChannel, LogLoader, 
LogOffsetMetadata, LogOffsetSnapshot, LogOffsetsListener, LogSegments, 
ProducerStateManager, ProducerStateManagerConfig, RemoteLogReadResult, 
RemoteStorageFetchInfo, UnifiedLog, VerificationGuard}
+import org.apache.kafka.storage.internals.log.{AppendOrigin, CleanerConfig, 
FetchDataInfo, LocalLog, LogAppendInfo, LogConfig, LogDirFailureChannel, 
LogLoader, LogOffsetMetadata, LogOffsetSnapshot, LogOffsetsListener, 
LogSegments, ProducerStateManager, ProducerStateManagerConfig, 
RemoteLogReadResult, RemoteStorageFetchInfo, UnifiedLog, VerificationGuard}
 import org.apache.kafka.storage.log.metrics.BrokerTopicStats
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.{AfterAll, AfterEach, BeforeEach, Test}
@@ -5360,6 +5360,71 @@ class ReplicaManagerTest {
     assertEquals(expectedTopicId, fetchState.get.topicId)
   }
 
+  @Test
+  def testReplicaAlterLogDirsMultipleReassignmentDoesNotBlockLogCleaner(): 
Unit = {
+    val localId = 0
+    val tp = new TopicPartition(topic, 0)
+    val tpId = new TopicIdPartition(topicId, tp)
+
+    val props = TestUtils.createBrokerConfig(localId)
+    val path1 = TestUtils.tempRelativeDir("data").getAbsolutePath
+    val path2 = TestUtils.tempRelativeDir("data2").getAbsolutePath
+    val path3 = TestUtils.tempRelativeDir("data3").getAbsolutePath
+    props.put("log.dirs", Seq(path1, path2, path3).mkString(","))
+    val config = KafkaConfig.fromProps(props)
+    val mockLogMgr = TestUtils.createLogManager(config.logDirs.asScala.map(new 
File(_)), cleanerConfig = new CleanerConfig(true))
+    mockLogMgr.startup(Set())
+    val replicaManager = new ReplicaManager(
+      metrics = metrics,
+      config = config,
+      time = time,
+      scheduler = new MockScheduler(time),
+      logManager = mockLogMgr,
+      quotaManagers = quotaManager,
+      metadataCache = metadataCache,
+      logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size),
+      alterPartitionManager = alterPartitionManager,
+      addPartitionsToTxnManager = Some(addPartitionsToTxnManager))
+
+    try {
+      val spiedPartition = spy(Partition(tpId, time, replicaManager))
+      replicaManager.addOnlinePartition(tp, spiedPartition)
+
+      val leaderDelta = topicsCreateDelta(localId, isStartIdLeader = true, 
partitions = List(0, 1), List.empty, topic, topicIds(topic))
+      val leaderImage = imageFromTopics(leaderDelta.apply())
+      replicaManager.applyDelta(leaderDelta, leaderImage)
+
+      // Move the replica to the second log directory.
+      val partition = replicaManager.getPartitionOrException(tp)
+      val firstLogDir = partition.log.get.dir.getParentFile
+      val newReplicaFolder = replicaManager.logManager.liveLogDirs.filterNot(_ 
== firstLogDir).head
+      replicaManager.alterReplicaLogDirs(Map(tp -> 
newReplicaFolder.getAbsolutePath))
+
+      // Prevent promotion of future replica
+      
doReturn(false).when(spiedPartition).maybeReplaceCurrentWithFutureReplica()
+
+      // Make sure the future log is created with the correct topic ID.
+      val futureLog = replicaManager.futureLocalLogOrException(tp)
+      assertEquals(Optional.of(topicId), futureLog.topicId)
+
+      // Move the replica to the third log directory
+      val finalReplicaFolder = 
replicaManager.logManager.liveLogDirs.filterNot(it => it == firstLogDir || it 
== newReplicaFolder).head
+      replicaManager.alterReplicaLogDirs(Map(tp -> 
finalReplicaFolder.getAbsolutePath))
+
+      reset(spiedPartition)
+
+      TestUtils.waitUntilTrue(() => {
+        replicaManager.replicaAlterLogDirsManager.shutdownIdleFetcherThreads()
+        replicaManager.replicaAlterLogDirsManager.fetcherThreadMap.isEmpty
+      }, s"ReplicaAlterLogDirsThread should be gone", waitTimeMs = 60_000)
+
+      verify(replicaManager.logManager.cleaner, 
times(2)).resumeCleaning(Set(tp).asJava)
+    } finally {
+      replicaManager.shutdown(checkpointHW = false)
+      mockLogMgr.shutdown()
+    }
+  }
+
   @Test
   def testReplicaAlterLogDirs(): Unit = {
     val localId = 0
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala 
b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 9cc76e2e2e1..8b0affae9ea 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -56,7 +56,7 @@ import 
org.apache.kafka.server.config.{DelegationTokenManagerConfigs, KRaftConfi
 import org.apache.kafka.server.metrics.KafkaYammerMetrics
 import org.apache.kafka.server.util.MockTime
 import org.apache.kafka.storage.internals.checkpoint.OffsetCheckpointFile
-import org.apache.kafka.storage.internals.log.{CleanerConfig, LogConfig, 
LogDirFailureChannel, ProducerStateManagerConfig, UnifiedLog}
+import org.apache.kafka.storage.internals.log.{CleanerConfig, LogCleaner, 
LogConfig, LogDirFailureChannel, ProducerStateManagerConfig, UnifiedLog}
 import org.apache.kafka.storage.log.metrics.BrokerTopicStats
 import org.apache.kafka.test.{TestUtils => JTestUtils}
 import org.junit.jupiter.api.Assertions._
@@ -973,7 +973,9 @@ object TestUtils extends Logging {
                    brokerTopicStats = new BrokerTopicStats,
                    logDirFailureChannel = new 
LogDirFailureChannel(logDirs.size),
                    remoteStorageSystemEnable = remoteStorageSystemEnable,
-                   initialTaskDelayMs = initialTaskDelayMs)
+                   initialTaskDelayMs = initialTaskDelayMs,
+                   cleanerFactory = (cleanerConfig, files, map, 
logDirFailureChannel, time) => Mockito.spy(new LogCleaner(cleanerConfig, files, 
map, logDirFailureChannel, time))
+    )
 
     if (logFn.isDefined) {
       val spyLogManager = Mockito.spy(logManager)

Reply via email to