This is an automated email from the ASF dual-hosted git repository.
junrao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 12761c07aea KAFKA-19458: resume cleaning on future replica dir change
(#20082)
12761c07aea is described below
commit 12761c07aead9743e7cfb036d1059c8a773ad106
Author: Gaurav Narula <[email protected]>
AuthorDate: Thu Jul 17 21:13:09 2025 +0100
KAFKA-19458: resume cleaning on future replica dir change (#20082)
`ReplicaManager#alterReplicaLogDirs` does not resume log cleaner while
handling an `AlterReplicaLogDirs` request for a topic partition which
already has an `AlterReplicaLogDirs` in progress, leading to a resource
leak where the cleaning for topic partitions remains paused even after
the log directory has been altered.
This change ensures we invoke `LogManager#resumeCleaning` if the future
replica directory has changed.
Reviewers: Jun Rao <[email protected]>
---
.../kafka/server/builders/LogManagerBuilder.java | 5 +-
core/src/main/scala/kafka/log/LogManager.scala | 9 ++-
.../main/scala/kafka/server/ReplicaManager.scala | 15 ++++-
.../unit/kafka/server/ReplicaManagerTest.scala | 67 +++++++++++++++++++++-
.../test/scala/unit/kafka/utils/TestUtils.scala | 6 +-
5 files changed, 94 insertions(+), 8 deletions(-)
diff --git a/core/src/main/java/kafka/server/builders/LogManagerBuilder.java
b/core/src/main/java/kafka/server/builders/LogManagerBuilder.java
index 71d192dc5ea..6de61915e8e 100644
--- a/core/src/main/java/kafka/server/builders/LogManagerBuilder.java
+++ b/core/src/main/java/kafka/server/builders/LogManagerBuilder.java
@@ -24,6 +24,7 @@ import org.apache.kafka.metadata.ConfigRepository;
import org.apache.kafka.server.config.ServerLogConfigs;
import org.apache.kafka.server.util.Scheduler;
import org.apache.kafka.storage.internals.log.CleanerConfig;
+import org.apache.kafka.storage.internals.log.LogCleaner;
import org.apache.kafka.storage.internals.log.LogConfig;
import org.apache.kafka.storage.internals.log.LogDirFailureChannel;
import org.apache.kafka.storage.internals.log.ProducerStateManagerConfig;
@@ -172,6 +173,8 @@ public class LogManagerBuilder {
logDirFailureChannel,
time,
remoteStorageSystemEnable,
- initialTaskDelayMs);
+ initialTaskDelayMs,
+ LogCleaner::new
+ );
}
}
diff --git a/core/src/main/scala/kafka/log/LogManager.scala
b/core/src/main/scala/kafka/log/LogManager.scala
index d9da339e215..d3f64793685 100755
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -77,7 +77,10 @@ class LogManager(logDirs: Seq[File],
logDirFailureChannel: LogDirFailureChannel,
time: Time,
remoteStorageSystemEnable: Boolean,
- val initialTaskDelayMs: Long) extends Logging {
+ val initialTaskDelayMs: Long,
+ cleanerFactory: (CleanerConfig, util.List[File],
ConcurrentMap[TopicPartition, UnifiedLog], LogDirFailureChannel, Time) =>
LogCleaner =
+ (cleanerConfig, files, map, logDirFailureChannel, time) =>
new LogCleaner(cleanerConfig, files, map, logDirFailureChannel, time)
+ ) extends Logging {
private val metricsGroup = new KafkaMetricsGroup(this.getClass)
@@ -627,7 +630,7 @@ class LogManager(logDirs: Seq[File],
initialTaskDelayMs)
}
if (cleanerConfig.enableCleaner) {
- _cleaner = new LogCleaner(cleanerConfig, liveLogDirs.asJava,
currentLogs, logDirFailureChannel, time)
+ _cleaner = cleanerFactory(cleanerConfig, liveLogDirs.asJava,
currentLogs, logDirFailureChannel, time)
_cleaner.startup()
} else {
warn("The config `log.cleaner.enable` is deprecated and will be removed
in Kafka 5.0. Starting from Kafka 5.0, the log cleaner will always be enabled,
and this config will be ignored.")
@@ -893,7 +896,7 @@ class LogManager(logDirs: Seq[File],
/**
* Resume cleaning of the provided partition and log a message about it.
*/
- private def resumeCleaning(topicPartition: TopicPartition): Unit = {
+ def resumeCleaning(topicPartition: TopicPartition): Unit = {
if (cleaner != null) {
cleaner.resumeCleaning(util.Set.of(topicPartition))
info(s"Cleaning for partition $topicPartition is resumed")
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala
b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 448ec1cf264..8fa705ef8c4 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -501,10 +501,15 @@ class ReplicaManager(val config: KafkaConfig,
// Visible for testing
def createPartition(topicPartition: TopicPartition): Partition = {
val partition = Partition(topicPartition, time, this)
- allPartitions.put(topicPartition, HostedPartition.Online(partition))
+ addOnlinePartition(topicPartition, partition)
partition
}
+ // Visible for testing
+ private[server] def addOnlinePartition(topicPartition: TopicPartition,
partition: Partition): Unit = {
+ allPartitions.put(topicPartition, HostedPartition.Online(partition))
+ }
+
def onlinePartition(topicPartition: TopicPartition): Option[Partition] = {
getPartition(topicPartition) match {
case HostedPartition.Online(partition) => Some(partition)
@@ -1149,7 +1154,15 @@ class ReplicaManager(val config: KafkaConfig,
// Stop current replica movement if the destinationDir is
different from the existing destination log directory
if (partition.futureReplicaDirChanged(destinationDir)) {
replicaAlterLogDirsManager.removeFetcherForPartitions(Set(topicPartition))
+ // There's a chance that the future replica can be promoted
between the check for futureReplicaDirChanged
+ // and call to removeFetcherForPartitions. We want to avoid
resuming cleaning again in that case to avoid
+ // an IllegalStateException. The presence of a future log
after the call to removeFetcherForPartitions
+ // implies that it has not been promoted as both synchronize
on partitionMapLock.
+ val futureReplicaPromoted = partition.futureLog.isEmpty
partition.removeFutureLocalReplica()
+ if (!futureReplicaPromoted) {
+ logManager.resumeCleaning(topicPartition)
+ }
}
case HostedPartition.Offline(_) =>
throw new KafkaStorageException(s"Partition $topicPartition is
offline")
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 88e0ae5d35d..623c282185b 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -77,7 +77,7 @@ import org.apache.kafka.server.util.timer.MockTimer
import org.apache.kafka.server.util.{MockScheduler, MockTime, Scheduler}
import org.apache.kafka.storage.internals.checkpoint.LazyOffsetCheckpoints
import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache
-import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchDataInfo,
LocalLog, LogAppendInfo, LogConfig, LogDirFailureChannel, LogLoader,
LogOffsetMetadata, LogOffsetSnapshot, LogOffsetsListener, LogSegments,
ProducerStateManager, ProducerStateManagerConfig, RemoteLogReadResult,
RemoteStorageFetchInfo, UnifiedLog, VerificationGuard}
+import org.apache.kafka.storage.internals.log.{AppendOrigin, CleanerConfig,
FetchDataInfo, LocalLog, LogAppendInfo, LogConfig, LogDirFailureChannel,
LogLoader, LogOffsetMetadata, LogOffsetSnapshot, LogOffsetsListener,
LogSegments, ProducerStateManager, ProducerStateManagerConfig,
RemoteLogReadResult, RemoteStorageFetchInfo, UnifiedLog, VerificationGuard}
import org.apache.kafka.storage.log.metrics.BrokerTopicStats
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterAll, AfterEach, BeforeEach, Test}
@@ -5360,6 +5360,71 @@ class ReplicaManagerTest {
assertEquals(expectedTopicId, fetchState.get.topicId)
}
+ @Test
+ def testReplicaAlterLogDirsMultipleReassignmentDoesNotBlockLogCleaner():
Unit = {
+ val localId = 0
+ val tp = new TopicPartition(topic, 0)
+ val tpId = new TopicIdPartition(topicId, tp)
+
+ val props = TestUtils.createBrokerConfig(localId)
+ val path1 = TestUtils.tempRelativeDir("data").getAbsolutePath
+ val path2 = TestUtils.tempRelativeDir("data2").getAbsolutePath
+ val path3 = TestUtils.tempRelativeDir("data3").getAbsolutePath
+ props.put("log.dirs", Seq(path1, path2, path3).mkString(","))
+ val config = KafkaConfig.fromProps(props)
+ val mockLogMgr = TestUtils.createLogManager(config.logDirs.asScala.map(new
File(_)), cleanerConfig = new CleanerConfig(true))
+ mockLogMgr.startup(Set())
+ val replicaManager = new ReplicaManager(
+ metrics = metrics,
+ config = config,
+ time = time,
+ scheduler = new MockScheduler(time),
+ logManager = mockLogMgr,
+ quotaManagers = quotaManager,
+ metadataCache = metadataCache,
+ logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size),
+ alterPartitionManager = alterPartitionManager,
+ addPartitionsToTxnManager = Some(addPartitionsToTxnManager))
+
+ try {
+ val spiedPartition = spy(Partition(tpId, time, replicaManager))
+ replicaManager.addOnlinePartition(tp, spiedPartition)
+
+ val leaderDelta = topicsCreateDelta(localId, isStartIdLeader = true,
partitions = List(0, 1), List.empty, topic, topicIds(topic))
+ val leaderImage = imageFromTopics(leaderDelta.apply())
+ replicaManager.applyDelta(leaderDelta, leaderImage)
+
+ // Move the replica to the second log directory.
+ val partition = replicaManager.getPartitionOrException(tp)
+ val firstLogDir = partition.log.get.dir.getParentFile
+ val newReplicaFolder = replicaManager.logManager.liveLogDirs.filterNot(_
== firstLogDir).head
+ replicaManager.alterReplicaLogDirs(Map(tp ->
newReplicaFolder.getAbsolutePath))
+
+ // Prevent promotion of future replica
+
doReturn(false).when(spiedPartition).maybeReplaceCurrentWithFutureReplica()
+
+ // Make sure the future log is created with the correct topic ID.
+ val futureLog = replicaManager.futureLocalLogOrException(tp)
+ assertEquals(Optional.of(topicId), futureLog.topicId)
+
+ // Move the replica to the third log directory
+ val finalReplicaFolder =
replicaManager.logManager.liveLogDirs.filterNot(it => it == firstLogDir || it
== newReplicaFolder).head
+ replicaManager.alterReplicaLogDirs(Map(tp ->
finalReplicaFolder.getAbsolutePath))
+
+ reset(spiedPartition)
+
+ TestUtils.waitUntilTrue(() => {
+ replicaManager.replicaAlterLogDirsManager.shutdownIdleFetcherThreads()
+ replicaManager.replicaAlterLogDirsManager.fetcherThreadMap.isEmpty
+ }, s"ReplicaAlterLogDirsThread should be gone", waitTimeMs = 60_000)
+
+ verify(replicaManager.logManager.cleaner,
times(2)).resumeCleaning(Set(tp).asJava)
+ } finally {
+ replicaManager.shutdown(checkpointHW = false)
+ mockLogMgr.shutdown()
+ }
+ }
+
@Test
def testReplicaAlterLogDirs(): Unit = {
val localId = 0
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 9cc76e2e2e1..8b0affae9ea 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -56,7 +56,7 @@ import
org.apache.kafka.server.config.{DelegationTokenManagerConfigs, KRaftConfi
import org.apache.kafka.server.metrics.KafkaYammerMetrics
import org.apache.kafka.server.util.MockTime
import org.apache.kafka.storage.internals.checkpoint.OffsetCheckpointFile
-import org.apache.kafka.storage.internals.log.{CleanerConfig, LogConfig,
LogDirFailureChannel, ProducerStateManagerConfig, UnifiedLog}
+import org.apache.kafka.storage.internals.log.{CleanerConfig, LogCleaner,
LogConfig, LogDirFailureChannel, ProducerStateManagerConfig, UnifiedLog}
import org.apache.kafka.storage.log.metrics.BrokerTopicStats
import org.apache.kafka.test.{TestUtils => JTestUtils}
import org.junit.jupiter.api.Assertions._
@@ -973,7 +973,9 @@ object TestUtils extends Logging {
brokerTopicStats = new BrokerTopicStats,
logDirFailureChannel = new
LogDirFailureChannel(logDirs.size),
remoteStorageSystemEnable = remoteStorageSystemEnable,
- initialTaskDelayMs = initialTaskDelayMs)
+ initialTaskDelayMs = initialTaskDelayMs,
+ cleanerFactory = (cleanerConfig, files, map,
logDirFailureChannel, time) => Mockito.spy(new LogCleaner(cleanerConfig, files,
map, logDirFailureChannel, time))
+ )
if (logFn.isDefined) {
val spyLogManager = Mockito.spy(logManager)