[ https://issues.apache.org/jira/browse/KAFKA-7441?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16639070#comment-16639070 ]
ASF GitHub Bot commented on KAFKA-7441: --------------------------------------- lindong28 closed pull request #5694: KAFKA-7441; Allow LogCleanerManager.resumeCleaning() to be used concurrently URL: https://github.com/apache/kafka/pull/5694 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/core/src/main/scala/kafka/log/LogCleanerManager.scala b/core/src/main/scala/kafka/log/LogCleanerManager.scala index 680fa94e33e..abe02bebc9d 100755 --- a/core/src/main/scala/kafka/log/LogCleanerManager.scala +++ b/core/src/main/scala/kafka/log/LogCleanerManager.scala @@ -37,16 +37,24 @@ import scala.collection.{Iterable, immutable, mutable} private[log] sealed trait LogCleaningState private[log] case object LogCleaningInProgress extends LogCleaningState private[log] case object LogCleaningAborted extends LogCleaningState -private[log] case object LogCleaningPaused extends LogCleaningState +private[log] case class LogCleaningPaused(pausedCount: Int) extends LogCleaningState /** - * Manage the state of each partition being cleaned. - * If a partition is to be cleaned, it enters the LogCleaningInProgress state. - * While a partition is being cleaned, it can be requested to be aborted and paused. Then the partition first enters - * the LogCleaningAborted state. Once the cleaning task is aborted, the partition enters the LogCleaningPaused state. - * While a partition is in the LogCleaningPaused state, it won't be scheduled for cleaning again, until cleaning is - * requested to be resumed. - */ + * This class manages the state of each partition being cleaned. + * LogCleaningState defines the cleaning states that a TopicPartition can be in. + * 1. None : No cleaning state in a TopicPartition. In this state, it can become LogCleaningInProgress + * or LogCleaningPaused(1). Valid previous state are LogCleaningInProgress and LogCleaningPaused(1) + * 2. LogCleaningInProgress : The cleaning is currently in progress. In this state, it can become None when log cleaning is finished + * or become LogCleaningAborted. Valid previous state is None. + * 3. LogCleaningAborted : The cleaning abort is requested. In this state, it can become LogCleaningPaused(1). + * Valid previous state is LogCleaningInProgress. + * 4-a. LogCleaningPaused(1) : The cleaning is paused once. No log cleaning can be done in this state. + * : In this state, it can become None or LogCleaningPaused(2). + * : Valid previous state is None, LogCleaningAborted or LogCleaningPaused(2). + * 4-b. LogCleaningPaused(i) : The cleaning is paused i times where i>= 2. No log cleaning can be done in this state. + * In this state, it can become LogCleaningPaused(i-1) or LogCleaningPaused(i+1). + * Valid previous state is LogCleaningPaused(i-1) or LogCleaningPaused(i+1). + */ private[log] class LogCleanerManager(val logDirs: Seq[File], val logs: Pool[TopicPartition, Log], val logDirFailureChannel: LogDirFailureChannel) extends Logging with KafkaMetricsGroup { @@ -164,7 +172,7 @@ private[log] class LogCleanerManager(val logDirs: Seq[File], } deletableLogs.foreach { - case (topicPartition, _) => inProgress.put(topicPartition, LogCleaningPaused) + case (topicPartition, _) => inProgress.put(topicPartition, LogCleaningPaused(1)) } deletableLogs } @@ -207,22 +215,23 @@ private[log] class LogCleanerManager(val logDirs: Seq[File], * throws a LogCleaningAbortedException to stop the cleaning task. * 4. When the cleaning task is stopped, doneCleaning() is called, which sets the state of the partition as paused. * 5. abortAndPauseCleaning() waits until the state of the partition is changed to paused. + * 6. If the partition is already paused (by log retention), a new call to this function + * will increase the paused count by one. */ def abortAndPauseCleaning(topicPartition: TopicPartition) { inLock(lock) { inProgress.get(topicPartition) match { case None => - inProgress.put(topicPartition, LogCleaningPaused) - case Some(state) => - state match { - case LogCleaningInProgress => - inProgress.put(topicPartition, LogCleaningAborted) - case LogCleaningPaused => - case s => - throw new IllegalStateException(s"Compaction for partition $topicPartition cannot be aborted and paused since it is in $s state.") - } + inProgress.put(topicPartition, LogCleaningPaused(1)) + case Some(LogCleaningInProgress) => + inProgress.put(topicPartition, LogCleaningAborted) + case Some(LogCleaningPaused(count)) => + inProgress.put(topicPartition, LogCleaningPaused(count + 1)) + case Some(s) => + throw new IllegalStateException(s"Compaction for partition $topicPartition cannot be aborted and paused since it is in $s state.") } - while (!isCleaningInState(topicPartition, LogCleaningPaused)) + + while(!isCleaningInStatePaused(topicPartition)) pausedCleaningCond.await(100, TimeUnit.MILLISECONDS) } info(s"The cleaning for partition $topicPartition is aborted and paused") @@ -230,6 +239,7 @@ private[log] class LogCleanerManager(val logDirs: Seq[File], /** * Resume the cleaning of paused partitions. + * Each call of this function will undo one pause. */ def resumeCleaning(topicPartitions: Iterable[TopicPartition]){ inLock(lock) { @@ -240,8 +250,10 @@ private[log] class LogCleanerManager(val logDirs: Seq[File], throw new IllegalStateException(s"Compaction for partition $topicPartition cannot be resumed since it is not paused.") case Some(state) => state match { - case LogCleaningPaused => + case LogCleaningPaused(count) if count == 1 => inProgress.remove(topicPartition) + case LogCleaningPaused(count) if count > 1 => + inProgress.put(topicPartition, LogCleaningPaused(count - 1)) case s => throw new IllegalStateException(s"Compaction for partition $topicPartition cannot be resumed since it is in $s state.") } @@ -264,6 +276,22 @@ private[log] class LogCleanerManager(val logDirs: Seq[File], } } + /** + * Check if the cleaning for a partition is paused. The caller is expected to hold lock while making the call. + */ + private def isCleaningInStatePaused(topicPartition: TopicPartition): Boolean = { + inProgress.get(topicPartition) match { + case None => false + case Some(state) => + state match { + case LogCleaningPaused(s) => + true + case _ => + false + } + } + } + /** * Check if the cleaning for a partition is aborted. If so, throw an exception. */ @@ -337,7 +365,7 @@ private[log] class LogCleanerManager(val logDirs: Seq[File], updateCheckpoints(dataDir, Option(topicPartition, endOffset)) inProgress.remove(topicPartition) case Some(LogCleaningAborted) => - inProgress.put(topicPartition, LogCleaningPaused) + inProgress.put(topicPartition, LogCleaningPaused(1)) pausedCleaningCond.signalAll() case None => throw new IllegalStateException(s"State for partition $topicPartition should exist.") @@ -355,7 +383,7 @@ private[log] class LogCleanerManager(val logDirs: Seq[File], case Some(LogCleaningInProgress) => inProgress.remove(topicPartition) case Some(LogCleaningAborted) => - inProgress.put(topicPartition, LogCleaningPaused) + inProgress.put(topicPartition, LogCleaningPaused(1)) pausedCleaningCond.signalAll() case None => throw new IllegalStateException(s"State for partition $topicPartition should exist.") diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala index eab85098474..bcf380154a4 100755 --- a/core/src/main/scala/kafka/log/LogManager.scala +++ b/core/src/main/scala/kafka/log/LogManager.scala @@ -546,11 +546,16 @@ class LogManager(logDirs: Seq[File], //Abort and pause the cleaning of the log, and resume after truncation is done. if (cleaner != null && !isFuture) cleaner.abortAndPauseCleaning(topicPartition) - log.truncateFullyAndStartAt(newOffset) - if (cleaner != null && !isFuture) { - cleaner.maybeTruncateCheckpoint(log.dir.getParentFile, topicPartition, log.activeSegment.baseOffset) - cleaner.resumeCleaning(Seq(topicPartition)) - info(s"Compaction for partition $topicPartition is resumed") + try { + log.truncateFullyAndStartAt(newOffset) + if (cleaner != null && !isFuture) { + cleaner.maybeTruncateCheckpoint(log.dir.getParentFile, topicPartition, log.activeSegment.baseOffset) + } + } finally { + if (cleaner != null && !isFuture) { + cleaner.resumeCleaning(Seq(topicPartition)) + info(s"Compaction for partition $topicPartition is resumed") + } } checkpointLogRecoveryOffsetsInDir(log.dir.getParentFile) } diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala index 3653e282383..2a4869098e7 100644 --- a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala @@ -91,11 +91,10 @@ class LogCleanerManagerTest extends JUnitSuite with Logging { } /** - * log with retention in progress should not be picked up for compaction and vice versa when log cleanup policy - * is changed between "compact" and "delete" + * log under cleanup should be ineligible for compaction */ @Test - def testLogsWithRetentionInprogressShouldNotPickedUpForCompactionAndViceVersa(): Unit = { + def testLogsUnderCleanupIneligibleForCompaction(): Unit = { val records = TestUtils.singletonRecords("test".getBytes, key="test".getBytes) val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Delete) val cleanerManager: LogCleanerManager = createCleanerManager(log) @@ -105,7 +104,7 @@ class LogCleanerManagerTest extends JUnitSuite with Logging { log.appendAsLeader(records, leaderEpoch = 0) log.onHighWatermarkIncremented(2L) - // simulate retention thread working on the log partition + // simulate cleanup thread working on the log partition val deletableLog = cleanerManager.pauseCleaningForNonCompactedPartitions() assertEquals("should have 1 logs ready to be deleted", 1, deletableLog.size) @@ -118,11 +117,11 @@ class LogCleanerManagerTest extends JUnitSuite with Logging { val config = LogConfig(logProps) log.config = config - // log retention inprogress, the log is not available for compaction + // log cleanup inprogress, the log is not available for compaction val cleanable = cleanerManager.grabFilthiestCompactedLog(time) assertEquals("should have 0 logs ready to be compacted", 0, cleanable.size) - // log retention finished, and log can be picked up for compaction + // log cleanup finished, and log can be picked up for compaction cleanerManager.resumeCleaning(deletableLog.map(_._1)) val cleanable2 = cleanerManager.grabFilthiestCompactedLog(time) assertEquals("should have 1 logs ready to be compacted", 1, cleanable2.size) @@ -132,16 +131,55 @@ class LogCleanerManagerTest extends JUnitSuite with Logging { val config2 = LogConfig(logProps) log.config = config2 - // compaction in progress, should have 0 log eligible for log retention + // compaction in progress, should have 0 log eligible for log cleanup val deletableLog2 = cleanerManager.pauseCleaningForNonCompactedPartitions() assertEquals("should have 0 logs ready to be deleted", 0, deletableLog2.size) - // compaction done, should have 1 log eligible for log retention + // compaction done, should have 1 log eligible for log cleanup cleanerManager.doneDeleting(Seq(cleanable2.get.topicPartition)) val deletableLog3 = cleanerManager.pauseCleaningForNonCompactedPartitions() assertEquals("should have 1 logs ready to be deleted", 1, deletableLog3.size) } + /** + * log under cleanup should still be eligible for log truncation + */ + @Test + def testConcurrentLogCleanupAndLogTruncation(): Unit = { + val records = TestUtils.singletonRecords("test".getBytes, key="test".getBytes) + val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Delete) + val cleanerManager: LogCleanerManager = createCleanerManager(log) + + // log cleanup starts + val pausedPartitions = cleanerManager.pauseCleaningForNonCompactedPartitions() + // Log truncation happens due to unclean leader election + cleanerManager.abortAndPauseCleaning(log.topicPartition) + cleanerManager.resumeCleaning(Seq(log.topicPartition)) + // log cleanup finishes and pausedPartitions are resumed + cleanerManager.resumeCleaning(pausedPartitions.map(_._1)) + + assertEquals(None, cleanerManager.cleaningState(log.topicPartition)) + } + + /** + * log under cleanup should still be eligible for topic deletion + */ + @Test + def testConcurrentLogCleanupAndTopicDeletion(): Unit = { + val records = TestUtils.singletonRecords("test".getBytes, key="test".getBytes) + val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Delete) + val cleanerManager: LogCleanerManager = createCleanerManager(log) + + // log cleanup starts + val pausedPartitions = cleanerManager.pauseCleaningForNonCompactedPartitions() + // Broker processes StopReplicaRequest with delete=true + cleanerManager.abortCleaning(log.topicPartition) + // log cleanup finishes and pausedPartitions are resumed + cleanerManager.resumeCleaning(pausedPartitions.map(_._1)) + + assertEquals(None, cleanerManager.cleaningState(log.topicPartition)) + } + /** * Test computation of cleanable range with no minimum compaction lag settings active */ @@ -280,7 +318,7 @@ class LogCleanerManagerTest extends JUnitSuite with Logging { val tp = new TopicPartition("log", 0) intercept[IllegalStateException](cleanerManager.doneCleaning(tp, log.dir, 1)) - cleanerManager.setCleaningState(tp, LogCleaningPaused) + cleanerManager.setCleaningState(tp, LogCleaningPaused(1)) intercept[IllegalStateException](cleanerManager.doneCleaning(tp, log.dir, 1)) cleanerManager.setCleaningState(tp, LogCleaningInProgress) @@ -290,7 +328,7 @@ class LogCleanerManagerTest extends JUnitSuite with Logging { cleanerManager.setCleaningState(tp, LogCleaningAborted) cleanerManager.doneCleaning(tp, log.dir, 1) - assertEquals(LogCleaningPaused, cleanerManager.cleaningState(tp).get) + assertEquals(LogCleaningPaused(1), cleanerManager.cleaningState(tp).get) assertTrue(cleanerManager.allCleanerCheckpoints.get(tp).nonEmpty) } @@ -304,7 +342,7 @@ class LogCleanerManagerTest extends JUnitSuite with Logging { intercept[IllegalStateException](cleanerManager.doneDeleting(Seq(tp))) - cleanerManager.setCleaningState(tp, LogCleaningPaused) + cleanerManager.setCleaningState(tp, LogCleaningPaused(1)) intercept[IllegalStateException](cleanerManager.doneDeleting(Seq(tp))) cleanerManager.setCleaningState(tp, LogCleaningInProgress) @@ -313,7 +351,7 @@ class LogCleanerManagerTest extends JUnitSuite with Logging { cleanerManager.setCleaningState(tp, LogCleaningAborted) cleanerManager.doneDeleting(Seq(tp)) - assertEquals(LogCleaningPaused, cleanerManager.cleaningState(tp).get) + assertEquals(LogCleaningPaused(1), cleanerManager.cleaningState(tp).get) } ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Allow LogCleanerManager.resumeCleaning() to be used concurrently > ---------------------------------------------------------------- > > Key: KAFKA-7441 > URL: https://issues.apache.org/jira/browse/KAFKA-7441 > Project: Kafka > Issue Type: Improvement > Reporter: xiongqi wu > Assignee: xiongqi wu > Priority: Blocker > Fix For: 2.1.0 > > > LogCleanerManger provides APIs abortAndPauseCleaning(TopicPartition) and > resumeCleaning(Iterable[TopicPartition]). The abortAndPauseCleaning(...) will > do nothing if the partition is already in paused state. And > resumeCleaning(..) will always clear the state for the partition if the > partition is in paused state. Also, resumeCleaning(...) will throw > IllegalStateException if the partition does not have any state (e.g. its > state is cleared). > > This will cause problem in the following scenario: > 1) Background thread invokes LogManager.cleanupLogs() which in turn does > abortAndPauseCleaning(...) for a given partition. Now this partition is in > paused state. > 2) User requests deletion for this partition. Controller sends > StopReplicaRequest with delete=true for this partition. RequestHanderThread > calls abortAndPauseCleaning(...) followed by resumeCleaning(...) for the same > partition. Now there is no state for this partition. > 3) Background thread invokes resumeCleaning(...) as part of > LogManager.cleanupLogs(). Because there is no state for this partition, it > causes IllegalStateException. > > This issue can also happen before KAFKA-7322 if unclean leader election > triggers log truncation for a partition at the same time that the partition > is deleted upon user request. But unclean leader election is very rare. The > fix made in https://issues.apache.org/jira/browse/KAFKA-7322 makes this issue > much more frequent. > The solution is to record the number of pauses. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)