[ 
https://issues.apache.org/jira/browse/KAFKA-7441?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16639070#comment-16639070
 ] 

ASF GitHub Bot commented on KAFKA-7441:
---------------------------------------

lindong28 closed pull request #5694: KAFKA-7441; Allow 
LogCleanerManager.resumeCleaning() to be used concurrently
URL: https://github.com/apache/kafka/pull/5694
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/scala/kafka/log/LogCleanerManager.scala 
b/core/src/main/scala/kafka/log/LogCleanerManager.scala
index 680fa94e33e..abe02bebc9d 100755
--- a/core/src/main/scala/kafka/log/LogCleanerManager.scala
+++ b/core/src/main/scala/kafka/log/LogCleanerManager.scala
@@ -37,16 +37,24 @@ import scala.collection.{Iterable, immutable, mutable}
 private[log] sealed trait LogCleaningState
 private[log] case object LogCleaningInProgress extends LogCleaningState
 private[log] case object LogCleaningAborted extends LogCleaningState
-private[log] case object LogCleaningPaused extends LogCleaningState
+private[log] case class LogCleaningPaused(pausedCount: Int) extends 
LogCleaningState
 
 /**
- *  Manage the state of each partition being cleaned.
- *  If a partition is to be cleaned, it enters the LogCleaningInProgress state.
- *  While a partition is being cleaned, it can be requested to be aborted and 
paused. Then the partition first enters
- *  the LogCleaningAborted state. Once the cleaning task is aborted, the 
partition enters the LogCleaningPaused state.
- *  While a partition is in the LogCleaningPaused state, it won't be scheduled 
for cleaning again, until cleaning is
- *  requested to be resumed.
- */
+  * This class manages the state of each partition being cleaned.
+  * LogCleaningState defines the cleaning states that a TopicPartition can be 
in.
+  * 1. None                    : No cleaning state in a TopicPartition. In 
this state, it can become LogCleaningInProgress
+  *                              or LogCleaningPaused(1). Valid previous state 
are LogCleaningInProgress and LogCleaningPaused(1)
+  * 2. LogCleaningInProgress   : The cleaning is currently in progress. In 
this state, it can become None when log cleaning is finished
+  *                              or become LogCleaningAborted. Valid previous 
state is None.
+  * 3. LogCleaningAborted      : The cleaning abort is requested. In this 
state, it can become LogCleaningPaused(1).
+  *                              Valid previous state is LogCleaningInProgress.
+  * 4-a. LogCleaningPaused(1)  : The cleaning is paused once. No log cleaning 
can be done in this state.
+  *                            : In this state, it can become None or 
LogCleaningPaused(2).
+  *                            : Valid previous state is None, 
LogCleaningAborted or LogCleaningPaused(2).
+  * 4-b. LogCleaningPaused(i)  : The cleaning is paused i times where i>= 2. 
No log cleaning can be done in this state.
+  *                              In this state, it can become 
LogCleaningPaused(i-1) or LogCleaningPaused(i+1).
+  *                              Valid previous state is 
LogCleaningPaused(i-1) or LogCleaningPaused(i+1).
+  */
 private[log] class LogCleanerManager(val logDirs: Seq[File],
                                      val logs: Pool[TopicPartition, Log],
                                      val logDirFailureChannel: 
LogDirFailureChannel) extends Logging with KafkaMetricsGroup {
@@ -164,7 +172,7 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
       }
 
       deletableLogs.foreach {
-        case (topicPartition, _) => inProgress.put(topicPartition, 
LogCleaningPaused)
+        case (topicPartition, _) => inProgress.put(topicPartition, 
LogCleaningPaused(1))
       }
       deletableLogs
     }
@@ -207,22 +215,23 @@ private[log] class LogCleanerManager(val logDirs: 
Seq[File],
    *     throws a LogCleaningAbortedException to stop the cleaning task.
    *  4. When the cleaning task is stopped, doneCleaning() is called, which 
sets the state of the partition as paused.
    *  5. abortAndPauseCleaning() waits until the state of the partition is 
changed to paused.
+   *  6. If the partition is already paused (by log retention), a new call to 
this function
+   *     will increase the paused count by one.
    */
   def abortAndPauseCleaning(topicPartition: TopicPartition) {
     inLock(lock) {
       inProgress.get(topicPartition) match {
         case None =>
-          inProgress.put(topicPartition, LogCleaningPaused)
-        case Some(state) =>
-          state match {
-            case LogCleaningInProgress =>
-              inProgress.put(topicPartition, LogCleaningAborted)
-            case LogCleaningPaused =>
-            case s =>
-              throw new IllegalStateException(s"Compaction for partition 
$topicPartition cannot be aborted and paused since it is in $s state.")
-          }
+          inProgress.put(topicPartition, LogCleaningPaused(1))
+        case Some(LogCleaningInProgress) =>
+          inProgress.put(topicPartition, LogCleaningAborted)
+        case Some(LogCleaningPaused(count)) =>
+          inProgress.put(topicPartition, LogCleaningPaused(count + 1))
+        case Some(s) =>
+          throw new IllegalStateException(s"Compaction for partition 
$topicPartition cannot be aborted and paused since it is in $s state.")
       }
-      while (!isCleaningInState(topicPartition, LogCleaningPaused))
+
+      while(!isCleaningInStatePaused(topicPartition))
         pausedCleaningCond.await(100, TimeUnit.MILLISECONDS)
     }
     info(s"The cleaning for partition $topicPartition is aborted and paused")
@@ -230,6 +239,7 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
 
   /**
     *  Resume the cleaning of paused partitions.
+    *  Each call of this function will undo one pause.
     */
   def resumeCleaning(topicPartitions: Iterable[TopicPartition]){
     inLock(lock) {
@@ -240,8 +250,10 @@ private[log] class LogCleanerManager(val logDirs: 
Seq[File],
               throw new IllegalStateException(s"Compaction for partition 
$topicPartition cannot be resumed since it is not paused.")
             case Some(state) =>
               state match {
-                case LogCleaningPaused =>
+                case LogCleaningPaused(count) if count == 1 =>
                   inProgress.remove(topicPartition)
+                case LogCleaningPaused(count) if count > 1 =>
+                  inProgress.put(topicPartition, LogCleaningPaused(count - 1))
                 case s =>
                   throw new IllegalStateException(s"Compaction for partition 
$topicPartition cannot be resumed since it is in $s state.")
               }
@@ -264,6 +276,22 @@ private[log] class LogCleanerManager(val logDirs: 
Seq[File],
     }
   }
 
+  /**
+   *  Check if the cleaning for a partition is paused. The caller is expected 
to hold lock while making the call.
+   */
+  private def isCleaningInStatePaused(topicPartition: TopicPartition): Boolean 
= {
+    inProgress.get(topicPartition) match {
+      case None => false
+      case Some(state) =>
+        state match {
+          case LogCleaningPaused(s) =>
+            true
+          case _ =>
+            false
+        }
+    }
+  }
+
   /**
    *  Check if the cleaning for a partition is aborted. If so, throw an 
exception.
    */
@@ -337,7 +365,7 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
           updateCheckpoints(dataDir, Option(topicPartition, endOffset))
           inProgress.remove(topicPartition)
         case Some(LogCleaningAborted) =>
-          inProgress.put(topicPartition, LogCleaningPaused)
+          inProgress.put(topicPartition, LogCleaningPaused(1))
           pausedCleaningCond.signalAll()
         case None =>
           throw new IllegalStateException(s"State for partition 
$topicPartition should exist.")
@@ -355,7 +383,7 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
             case Some(LogCleaningInProgress) =>
               inProgress.remove(topicPartition)
             case Some(LogCleaningAborted) =>
-              inProgress.put(topicPartition, LogCleaningPaused)
+              inProgress.put(topicPartition, LogCleaningPaused(1))
               pausedCleaningCond.signalAll()
             case None =>
               throw new IllegalStateException(s"State for partition 
$topicPartition should exist.")
diff --git a/core/src/main/scala/kafka/log/LogManager.scala 
b/core/src/main/scala/kafka/log/LogManager.scala
index eab85098474..bcf380154a4 100755
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -546,11 +546,16 @@ class LogManager(logDirs: Seq[File],
         //Abort and pause the cleaning of the log, and resume after truncation 
is done.
       if (cleaner != null && !isFuture)
         cleaner.abortAndPauseCleaning(topicPartition)
-      log.truncateFullyAndStartAt(newOffset)
-      if (cleaner != null && !isFuture) {
-        cleaner.maybeTruncateCheckpoint(log.dir.getParentFile, topicPartition, 
log.activeSegment.baseOffset)
-        cleaner.resumeCleaning(Seq(topicPartition))
-        info(s"Compaction for partition $topicPartition is resumed")
+      try {
+        log.truncateFullyAndStartAt(newOffset)
+        if (cleaner != null && !isFuture) {
+          cleaner.maybeTruncateCheckpoint(log.dir.getParentFile, 
topicPartition, log.activeSegment.baseOffset)
+        }
+      } finally {
+        if (cleaner != null && !isFuture) {
+          cleaner.resumeCleaning(Seq(topicPartition))
+          info(s"Compaction for partition $topicPartition is resumed")
+        }
       }
       checkpointLogRecoveryOffsetsInDir(log.dir.getParentFile)
     }
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala 
b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
index 3653e282383..2a4869098e7 100644
--- a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
@@ -91,11 +91,10 @@ class LogCleanerManagerTest extends JUnitSuite with Logging 
{
   }
 
   /**
-    * log with retention in progress should not be picked up for compaction 
and vice versa when log cleanup policy
-    * is changed between "compact" and "delete"
+    * log under cleanup should be ineligible for compaction
     */
   @Test
-  def 
testLogsWithRetentionInprogressShouldNotPickedUpForCompactionAndViceVersa(): 
Unit = {
+  def testLogsUnderCleanupIneligibleForCompaction(): Unit = {
     val records = TestUtils.singletonRecords("test".getBytes, 
key="test".getBytes)
     val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Delete)
     val cleanerManager: LogCleanerManager = createCleanerManager(log)
@@ -105,7 +104,7 @@ class LogCleanerManagerTest extends JUnitSuite with Logging 
{
     log.appendAsLeader(records, leaderEpoch = 0)
     log.onHighWatermarkIncremented(2L)
 
-    // simulate retention thread working on the log partition
+    // simulate cleanup thread working on the log partition
     val deletableLog = cleanerManager.pauseCleaningForNonCompactedPartitions()
     assertEquals("should have 1 logs ready to be deleted", 1, 
deletableLog.size)
 
@@ -118,11 +117,11 @@ class LogCleanerManagerTest extends JUnitSuite with 
Logging {
     val config = LogConfig(logProps)
     log.config = config
 
-    // log retention inprogress, the log is not available for compaction
+    // log cleanup inprogress, the log is not available for compaction
     val cleanable = cleanerManager.grabFilthiestCompactedLog(time)
     assertEquals("should have 0 logs ready to be compacted", 0, cleanable.size)
 
-    // log retention finished, and log can be picked up for compaction
+    // log cleanup finished, and log can be picked up for compaction
     cleanerManager.resumeCleaning(deletableLog.map(_._1))
     val cleanable2 = cleanerManager.grabFilthiestCompactedLog(time)
     assertEquals("should have 1 logs ready to be compacted", 1, 
cleanable2.size)
@@ -132,16 +131,55 @@ class LogCleanerManagerTest extends JUnitSuite with 
Logging {
     val config2 = LogConfig(logProps)
     log.config = config2
 
-    // compaction in progress, should have 0 log eligible for log retention
+    // compaction in progress, should have 0 log eligible for log cleanup
     val deletableLog2 = cleanerManager.pauseCleaningForNonCompactedPartitions()
     assertEquals("should have 0 logs ready to be deleted", 0, 
deletableLog2.size)
 
-    // compaction done, should have 1 log eligible for log retention
+    // compaction done, should have 1 log eligible for log cleanup
     cleanerManager.doneDeleting(Seq(cleanable2.get.topicPartition))
     val deletableLog3 = cleanerManager.pauseCleaningForNonCompactedPartitions()
     assertEquals("should have 1 logs ready to be deleted", 1, 
deletableLog3.size)
   }
 
+  /**
+    * log under cleanup should still be eligible for log truncation
+    */
+  @Test
+  def testConcurrentLogCleanupAndLogTruncation(): Unit = {
+    val records = TestUtils.singletonRecords("test".getBytes, 
key="test".getBytes)
+    val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Delete)
+    val cleanerManager: LogCleanerManager = createCleanerManager(log)
+
+    // log cleanup starts
+    val pausedPartitions = 
cleanerManager.pauseCleaningForNonCompactedPartitions()
+    // Log truncation happens due to unclean leader election
+    cleanerManager.abortAndPauseCleaning(log.topicPartition)
+    cleanerManager.resumeCleaning(Seq(log.topicPartition))
+    // log cleanup finishes and pausedPartitions are resumed
+    cleanerManager.resumeCleaning(pausedPartitions.map(_._1))
+
+    assertEquals(None, cleanerManager.cleaningState(log.topicPartition))
+  }
+
+  /**
+    * log under cleanup should still be eligible for topic deletion
+    */
+  @Test
+  def testConcurrentLogCleanupAndTopicDeletion(): Unit = {
+    val records = TestUtils.singletonRecords("test".getBytes, 
key="test".getBytes)
+    val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Delete)
+    val cleanerManager: LogCleanerManager = createCleanerManager(log)
+
+    // log cleanup starts
+    val pausedPartitions = 
cleanerManager.pauseCleaningForNonCompactedPartitions()
+    // Broker processes StopReplicaRequest with delete=true
+    cleanerManager.abortCleaning(log.topicPartition)
+    // log cleanup finishes and pausedPartitions are resumed
+    cleanerManager.resumeCleaning(pausedPartitions.map(_._1))
+
+    assertEquals(None, cleanerManager.cleaningState(log.topicPartition))
+  }
+
   /**
     * Test computation of cleanable range with no minimum compaction lag 
settings active
     */
@@ -280,7 +318,7 @@ class LogCleanerManagerTest extends JUnitSuite with Logging 
{
     val tp = new TopicPartition("log", 0)
     intercept[IllegalStateException](cleanerManager.doneCleaning(tp, log.dir, 
1))
 
-    cleanerManager.setCleaningState(tp, LogCleaningPaused)
+    cleanerManager.setCleaningState(tp, LogCleaningPaused(1))
     intercept[IllegalStateException](cleanerManager.doneCleaning(tp, log.dir, 
1))
 
     cleanerManager.setCleaningState(tp, LogCleaningInProgress)
@@ -290,7 +328,7 @@ class LogCleanerManagerTest extends JUnitSuite with Logging 
{
 
     cleanerManager.setCleaningState(tp, LogCleaningAborted)
     cleanerManager.doneCleaning(tp, log.dir, 1)
-    assertEquals(LogCleaningPaused, cleanerManager.cleaningState(tp).get)
+    assertEquals(LogCleaningPaused(1), cleanerManager.cleaningState(tp).get)
     assertTrue(cleanerManager.allCleanerCheckpoints.get(tp).nonEmpty)
   }
 
@@ -304,7 +342,7 @@ class LogCleanerManagerTest extends JUnitSuite with Logging 
{
 
     intercept[IllegalStateException](cleanerManager.doneDeleting(Seq(tp)))
 
-    cleanerManager.setCleaningState(tp, LogCleaningPaused)
+    cleanerManager.setCleaningState(tp, LogCleaningPaused(1))
     intercept[IllegalStateException](cleanerManager.doneDeleting(Seq(tp)))
 
     cleanerManager.setCleaningState(tp, LogCleaningInProgress)
@@ -313,7 +351,7 @@ class LogCleanerManagerTest extends JUnitSuite with Logging 
{
 
     cleanerManager.setCleaningState(tp, LogCleaningAborted)
     cleanerManager.doneDeleting(Seq(tp))
-    assertEquals(LogCleaningPaused, cleanerManager.cleaningState(tp).get)
+    assertEquals(LogCleaningPaused(1), cleanerManager.cleaningState(tp).get)
 
   }
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Allow LogCleanerManager.resumeCleaning() to be used concurrently
> ----------------------------------------------------------------
>
>                 Key: KAFKA-7441
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7441
>             Project: Kafka
>          Issue Type: Improvement
>            Reporter: xiongqi wu
>            Assignee: xiongqi wu
>            Priority: Blocker
>             Fix For: 2.1.0
>
>
> LogCleanerManger provides APIs abortAndPauseCleaning(TopicPartition) and 
> resumeCleaning(Iterable[TopicPartition]). The abortAndPauseCleaning(...) will 
> do nothing if the partition is already in paused state. And 
> resumeCleaning(..) will always clear the state for the partition if the 
> partition is in paused state. Also, resumeCleaning(...) will throw 
> IllegalStateException if the partition does not have any state (e.g. its 
> state is cleared).
>  
> This will cause problem in the following scenario:
> 1) Background thread invokes LogManager.cleanupLogs() which in turn does  
> abortAndPauseCleaning(...) for a given partition. Now this partition is in 
> paused state.
> 2) User requests deletion for this partition. Controller sends 
> StopReplicaRequest with delete=true for this partition. RequestHanderThread 
> calls abortAndPauseCleaning(...) followed by resumeCleaning(...) for the same 
> partition. Now there is no state for this partition.
> 3) Background thread invokes resumeCleaning(...) as part of 
> LogManager.cleanupLogs(). Because there is no state for this partition, it 
> causes IllegalStateException.
>  
> This issue can also happen before KAFKA-7322 if unclean leader election 
> triggers log truncation for a partition at the same time that the partition 
> is deleted upon user request. But unclean leader election is very rare. The 
> fix made in https://issues.apache.org/jira/browse/KAFKA-7322 makes this issue 
> much more frequent.
> The solution is to record the number of pauses.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to