[ https://issues.apache.org/jira/browse/KAFKA-7322?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16618552#comment-16618552 ]
ASF GitHub Bot commented on KAFKA-7322: --------------------------------------- lindong28 closed pull request #5591: KAFKA-7322: Fix race condition between log cleaner thread and log retention thread when topic cleanup policy is updated URL: https://github.com/apache/kafka/pull/5591 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala index 91ddbf09305..0b4abe80ef1 100644 --- a/core/src/main/scala/kafka/log/LogCleaner.scala +++ b/core/src/main/scala/kafka/log/LogCleaner.scala @@ -36,7 +36,7 @@ import org.apache.kafka.common.record._ import org.apache.kafka.common.utils.Time import scala.collection.JavaConverters._ -import scala.collection.{Set, mutable} +import scala.collection.{Iterable, Set, mutable} /** * The cleaner is responsible for removing obsolete records from logs which have the "compact" retention strategy. @@ -219,10 +219,10 @@ class LogCleaner(initialConfig: CleanerConfig, } /** - * Resume the cleaning of a paused partition. This call blocks until the cleaning of a partition is resumed. - */ - def resumeCleaning(topicPartition: TopicPartition) { - cleanerManager.resumeCleaning(topicPartition) + * Resume the cleaning of paused partitions. + */ + def resumeCleaning(topicPartitions: Iterable[TopicPartition]) { + cleanerManager.resumeCleaning(topicPartitions) } /** @@ -246,6 +246,15 @@ class LogCleaner(initialConfig: CleanerConfig, isCleaned } + /** + * To prevent race between retention and compaction, + * retention threads need to make this call to obtain: + * @return A list of log partitions that retention threads can safely work on + */ + def pauseCleaningForNonCompactedPartitions(): Iterable[(TopicPartition, Log)] = { + cleanerManager.pauseCleaningForNonCompactedPartitions() + } + // Only for testing private[kafka] def currentConfig: CleanerConfig = config @@ -315,14 +324,16 @@ class LogCleaner(initialConfig: CleanerConfig, true } val deletable: Iterable[(TopicPartition, Log)] = cleanerManager.deletableLogs() - deletable.foreach{ - case (topicPartition, log) => - try { + + try { + deletable.foreach { + case (_, log) => log.deleteOldSegments() - } finally { - cleanerManager.doneDeleting(topicPartition) - } + } + } finally { + cleanerManager.doneDeleting(deletable.map(_._1)) } + if (!cleaned) pause(config.backOffMs, TimeUnit.MILLISECONDS) } diff --git a/core/src/main/scala/kafka/log/LogCleanerManager.scala b/core/src/main/scala/kafka/log/LogCleanerManager.scala index ba8d7c7e9c0..83d902f952a 100755 --- a/core/src/main/scala/kafka/log/LogCleanerManager.scala +++ b/core/src/main/scala/kafka/log/LogCleanerManager.scala @@ -32,7 +32,7 @@ import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.utils.Time import org.apache.kafka.common.errors.KafkaStorageException -import scala.collection.{immutable, mutable} +import scala.collection.{Iterable, immutable, mutable} private[log] sealed trait LogCleaningState private[log] case object LogCleaningInProgress extends LogCleaningState @@ -148,6 +148,28 @@ private[log] class LogCleanerManager(val logDirs: Seq[File], } } + /** + * Pause logs cleaning for logs that do not have compaction enabled + * and do not have other deletion or compaction in progress. + * This is to handle potential race between retention and cleaner threads when users + * switch topic configuration between compacted and non-compacted topic. + * @return retention logs that have log cleaning successfully paused + */ + def pauseCleaningForNonCompactedPartitions(): Iterable[(TopicPartition, Log)] = { + inLock(lock) { + val deletableLogs = logs.filter { + case (_, log) => !log.config.compact // pick non-compacted logs + }.filterNot { + case (topicPartition, _) => inProgress.contains(topicPartition) // skip any logs already in-progress + } + + deletableLogs.foreach { + case (topicPartition, _) => inProgress.put(topicPartition, LogCleaningPaused) + } + deletableLogs + } + } + /** * Find any logs that have compact and delete enabled */ @@ -170,7 +192,7 @@ private[log] class LogCleanerManager(val logDirs: Seq[File], def abortCleaning(topicPartition: TopicPartition) { inLock(lock) { abortAndPauseCleaning(topicPartition) - resumeCleaning(topicPartition) + resumeCleaning(Seq(topicPartition)) } info(s"The cleaning for partition $topicPartition is aborted") } @@ -206,23 +228,25 @@ private[log] class LogCleanerManager(val logDirs: Seq[File], } /** - * Resume the cleaning of a paused partition. This call blocks until the cleaning of a partition is resumed. - */ - def resumeCleaning(topicPartition: TopicPartition) { + * Resume the cleaning of paused partitions. + */ + def resumeCleaning(topicPartitions: Iterable[TopicPartition]){ inLock(lock) { - inProgress.get(topicPartition) match { - case None => - throw new IllegalStateException(s"Compaction for partition $topicPartition cannot be resumed since it is not paused.") - case Some(state) => - state match { - case LogCleaningPaused => - inProgress.remove(topicPartition) - case s => - throw new IllegalStateException(s"Compaction for partition $topicPartition cannot be resumed since it is in $s state.") + topicPartitions.foreach { + topicPartition => + inProgress.get(topicPartition) match { + case None => + throw new IllegalStateException(s"Compaction for partition $topicPartition cannot be resumed since it is not paused.") + case Some(state) => + state match { + case LogCleaningPaused => + inProgress.remove(topicPartition) + case s => + throw new IllegalStateException(s"Compaction for partition $topicPartition cannot be resumed since it is in $s state.") + } } } } - info(s"Compaction for partition $topicPartition is resumed") } /** @@ -322,18 +346,21 @@ private[log] class LogCleanerManager(val logDirs: Seq[File], } } - def doneDeleting(topicPartition: TopicPartition): Unit = { + def doneDeleting(topicPartitions: Iterable[TopicPartition]): Unit = { inLock(lock) { - inProgress.get(topicPartition) match { - case Some(LogCleaningInProgress) => - inProgress.remove(topicPartition) - case Some(LogCleaningAborted) => - inProgress.put(topicPartition, LogCleaningPaused) - pausedCleaningCond.signalAll() - case None => - throw new IllegalStateException(s"State for partition $topicPartition should exist.") - case s => - throw new IllegalStateException(s"In-progress partition $topicPartition cannot be in $s state.") + topicPartitions.foreach { + topicPartition => + inProgress.get(topicPartition) match { + case Some(LogCleaningInProgress) => + inProgress.remove(topicPartition) + case Some(LogCleaningAborted) => + inProgress.put(topicPartition, LogCleaningPaused) + pausedCleaningCond.signalAll() + case None => + throw new IllegalStateException(s"State for partition $topicPartition should exist.") + case s => + throw new IllegalStateException(s"In-progress partition $topicPartition cannot be in $s state.") + } } } } diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala index 32203acde9a..eab85098474 100755 --- a/core/src/main/scala/kafka/log/LogManager.scala +++ b/core/src/main/scala/kafka/log/LogManager.scala @@ -515,8 +515,10 @@ class LogManager(logDirs: Seq[File], if (needToStopCleaner && !isFuture) cleaner.maybeTruncateCheckpoint(log.dir.getParentFile, topicPartition, log.activeSegment.baseOffset) } finally { - if (needToStopCleaner && !isFuture) - cleaner.resumeCleaning(topicPartition) + if (needToStopCleaner && !isFuture) { + cleaner.resumeCleaning(Seq(topicPartition)) + info(s"Compaction for partition $topicPartition is resumed") + } } } } @@ -547,7 +549,8 @@ class LogManager(logDirs: Seq[File], log.truncateFullyAndStartAt(newOffset) if (cleaner != null && !isFuture) { cleaner.maybeTruncateCheckpoint(log.dir.getParentFile, topicPartition, log.activeSegment.baseOffset) - cleaner.resumeCleaning(topicPartition) + cleaner.resumeCleaning(Seq(topicPartition)) + info(s"Compaction for partition $topicPartition is resumed") } checkpointLogRecoveryOffsetsInDir(log.dir.getParentFile) } @@ -785,7 +788,8 @@ class LogManager(logDirs: Seq[File], currentLogs.put(topicPartition, destLog) if (cleaner != null) { cleaner.alterCheckpointDir(topicPartition, sourceLog.dir.getParentFile, destLog.dir.getParentFile) - cleaner.resumeCleaning(topicPartition) + cleaner.resumeCleaning(Seq(topicPartition)) + info(s"Compaction for partition $topicPartition is resumed") } try { @@ -869,10 +873,38 @@ class LogManager(logDirs: Seq[File], debug("Beginning log cleanup...") var total = 0 val startMs = time.milliseconds - for(log <- allLogs; if !log.config.compact) { - debug("Garbage collecting '" + log.name + "'") - total += log.deleteOldSegments() + + // clean current logs. + val deletableLogs = { + if (cleaner != null) { + // prevent cleaner from working on same partitions when changing cleanup policy + cleaner.pauseCleaningForNonCompactedPartitions() + } else { + currentLogs.filter { + case (_, log) => !log.config.compact + } + } } + + try { + deletableLogs.foreach { + case (topicPartition, log) => + debug("Garbage collecting '" + log.name + "'") + total += log.deleteOldSegments() + + val futureLog = futureLogs.get(topicPartition) + if (futureLog != null) { + // clean future logs + debug("Garbage collecting future log '" + futureLog.name + "'") + total += futureLog.deleteOldSegments() + } + } + } finally { + if (cleaner != null) { + cleaner.resumeCleaning(deletableLogs.map(_._1)) + } + } + debug("Log cleanup completed. " + total + " files deleted in " + (time.milliseconds - startMs) / 1000 + " seconds") } diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala index 7455763f5b7..8cb2f9ec874 100644 --- a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala @@ -90,6 +90,58 @@ class LogCleanerManagerTest extends JUnitSuite with Logging { assertEquals("should have 1 logs ready to be deleted", 0, readyToDelete) } + /** + * log with retention in progress should not be picked up for compaction and vice versa when log cleanup policy + * is changed between "compact" and "delete" + */ + @Test + def testLogsWithRetentionInprogressShouldNotPickedUpForCompactionAndViceVersa(): Unit = { + val records = TestUtils.singletonRecords("test".getBytes, key="test".getBytes) + val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Delete) + val cleanerManager: LogCleanerManager = createCleanerManager(log) + + log.appendAsLeader(records, leaderEpoch = 0) + log.roll() + log.appendAsLeader(records, leaderEpoch = 0) + log.onHighWatermarkIncremented(2L) + + // simulate retention thread working on the log partition + val deletableLog = cleanerManager.pauseCleaningForNonCompactedPartitions() + assertEquals("should have 1 logs ready to be deleted", 1, deletableLog.size) + + // change cleanup policy from delete to compact + val logProps = new Properties() + logProps.put(LogConfig.SegmentBytesProp, log.config.segmentSize) + logProps.put(LogConfig.RetentionMsProp, log.config.retentionMs) + logProps.put(LogConfig.CleanupPolicyProp, LogConfig.Compact) + logProps.put(LogConfig.MinCleanableDirtyRatioProp, 0: Integer) + val config = LogConfig(logProps) + log.config = config + + // log retention inprogress, the log is not available for compaction + val cleanable = cleanerManager.grabFilthiestCompactedLog(time) + assertEquals("should have 0 logs ready to be compacted", 0, cleanable.size) + + // log retention finished, and log can be picked up for compaction + cleanerManager.resumeCleaning(deletableLog.map(_._1)) + val cleanable2 = cleanerManager.grabFilthiestCompactedLog(time) + assertEquals("should have 1 logs ready to be compacted", 1, cleanable2.size) + + // update cleanup policy to delete + logProps.put(LogConfig.CleanupPolicyProp, LogConfig.Delete) + val config2 = LogConfig(logProps) + log.config = config2 + + // compaction in progress, should have 0 log eligible for log retention + val deletableLog2 = cleanerManager.pauseCleaningForNonCompactedPartitions() + assertEquals("should have 0 logs ready to be deleted", 0, deletableLog2.size) + + // compaction done, should have 1 log eligible for log retention + cleanerManager.doneDeleting(Seq(cleanable2.get.topicPartition)) + val deletableLog3 = cleanerManager.pauseCleaningForNonCompactedPartitions() + assertEquals("should have 1 logs ready to be deleted", 1, deletableLog3.size) + } + /** * Test computation of cleanable range with no minimum compaction lag settings active */ @@ -250,17 +302,17 @@ class LogCleanerManagerTest extends JUnitSuite with Logging { val tp = new TopicPartition("log", 0) - intercept[IllegalStateException](cleanerManager.doneDeleting(tp)) + intercept[IllegalStateException](cleanerManager.doneDeleting(Seq(tp))) cleanerManager.setCleaningState(tp, LogCleaningPaused) - intercept[IllegalStateException](cleanerManager.doneDeleting(tp)) + intercept[IllegalStateException](cleanerManager.doneDeleting(Seq(tp))) cleanerManager.setCleaningState(tp, LogCleaningInProgress) - cleanerManager.doneDeleting(tp) + cleanerManager.doneDeleting(Seq(tp)) assertTrue(cleanerManager.cleaningState(tp).isEmpty) cleanerManager.setCleaningState(tp, LogCleaningAborted) - cleanerManager.doneDeleting(tp) + cleanerManager.doneDeleting(Seq(tp)) assertEquals(LogCleaningPaused, cleanerManager.cleaningState(tp).get) } ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Fix race condition between log cleaner thread and log retention thread when > topic cleanup policy is updated > ----------------------------------------------------------------------------------------------------------- > > Key: KAFKA-7322 > URL: https://issues.apache.org/jira/browse/KAFKA-7322 > Project: Kafka > Issue Type: Bug > Components: log > Reporter: xiongqi wu > Assignee: xiongqi wu > Priority: Major > Fix For: 2.1.0 > > > The deletion thread will grab the log.lock when it tries to rename log > segment and schedule for actual deletion. > The compaction thread only grabs the log.lock when it tries to replace the > original segments with the cleaned segment. The compaction thread doesn't > grab the log when it reads records from the original segments to build > offsetmap and new segments. As a result, if both deletion and compaction > threads work on the same log partition. We have a race condition. > This race happens when the topic cleanup policy is updated on the fly. > One case to hit this race condition: > 1: topic clean up policy is "compact" initially > 2: log cleaner (compaction) thread picks up the partition for compaction and > still in progress > 3: the topic clean up policy has been updated to "deletion" > 4: retention thread pick up the topic partition and delete some old segments. > 5: log cleaner thread reads from the deleted log and raise an IO exception. > > The proposed solution is to use "inprogress" map that cleaner manager has to > protect such a race. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)