junrao commented on a change in pull request #11327: URL: https://github.com/apache/kafka/pull/11327#discussion_r710316445
########## File path: core/src/main/scala/kafka/log/LogCleanerManager.scala ########## @@ -103,11 +103,14 @@ private[log] class LogCleanerManager(val logDirs: Seq[File], val lastClean = allCleanerCheckpoints val now = Time.SYSTEM.milliseconds partitions.iterator.map { tp => - val log = logs.get(tp) - val lastCleanOffset = lastClean.get(tp) - val offsetsToClean = cleanableOffsets(log, lastCleanOffset, now) - val (_, uncleanableBytes) = calculateCleanableBytes(log, offsetsToClean.firstDirtyOffset, offsetsToClean.firstUncleanableDirtyOffset) - uncleanableBytes + Option(logs.get(tp)).map( Review comment: We could do the following the make the code a bit simpler and more consistent. ``` map { log => val lastCleanOffset = lastClean.get(tp) ... } ``` ########## File path: core/src/main/scala/kafka/log/LogCleanerManager.scala ########## @@ -511,6 +514,28 @@ private[log] class LogCleanerManager(val logDirs: Seq[File], uncleanablePartitions.get(log.parentDir).exists(partitions => partitions.contains(topicPartition)) } } + + def maintainUncleanablePartitions(): Unit = { + // Remove deleted partitions from uncleanablePartitions + inLock(lock) { + // Remove non-existing logDir + // Note: we don't use retain or filterInPlace method in this function because retain in deprecated in + // scala 2.13 while filterInPlace is not available in scala 2.12. + uncleanablePartitions.filterNot { + case (logDir, _) => logDirs.map(_.getAbsolutePath).contains(logDir) + }.keys.foreach { + uncleanablePartitions.remove(_) + } + + uncleanablePartitions.values.foreach { + // Remove deleted partitions + partitions => partitions.filterNot(logs.contains(_)).foreach { + partitions.remove(_) Review comment: Hmm, does removing from a set has side effect of an ongoing iterator? If so, we want to figure out the items to remove first and do the removal in the end. Ditto for the code above in line 527. ########## File path: core/src/main/scala/kafka/log/LogCleanerManager.scala ########## @@ -511,6 +514,28 @@ private[log] class LogCleanerManager(val logDirs: Seq[File], uncleanablePartitions.get(log.parentDir).exists(partitions => partitions.contains(topicPartition)) } } + + def maintainUncleanablePartitions(): Unit = { + // Remove deleted partitions from uncleanablePartitions + inLock(lock) { + // Remove non-existing logDir + // Note: we don't use retain or filterInPlace method in this function because retain in deprecated in Review comment: in deprecated => is deprecated ########## File path: core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala ########## @@ -46,7 +44,7 @@ class LogCleanerIntegrationTest extends AbstractLogCleanerIntegrationTest with K TestUtils.clearYammerMetrics() } - @Timeout(value = DEFAULT_MAX_WAIT_MS, unit = TimeUnit.MILLISECONDS) + @Timeout(90) Review comment: Since we use mock time, why do we need a much longer timeout? ########## File path: core/src/main/scala/kafka/log/LogCleaner.scala ########## @@ -87,17 +87,23 @@ import scala.util.control.ControlThrowable * @param logDirs The directories where offset checkpoints reside * @param logs The pool of logs * @param time A way to control the passage of time + * @param scheduler The thread pool scheduler used for background actions */ class LogCleaner(initialConfig: CleanerConfig, val logDirs: Seq[File], val logs: Pool[TopicPartition, UnifiedLog], val logDirFailureChannel: LogDirFailureChannel, - time: Time = Time.SYSTEM) extends Logging with KafkaMetricsGroup with BrokerReconfigurable + time: Time = Time.SYSTEM, + private[log] val scheduler: Scheduler) extends Logging with KafkaMetricsGroup with BrokerReconfigurable { /* Log cleaner configuration which may be dynamically updated */ @volatile private var config = initialConfig + // Visible for testing + val housekeepingDelayMs = 30000 + val housekeepingIntervalMs = 30000 Review comment: 30 seconds seems quite frequent. Could we do 5 mins? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org