[ 
https://issues.apache.org/jira/browse/KAFKA-7322?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16618552#comment-16618552
 ] 

ASF GitHub Bot commented on KAFKA-7322:
---------------------------------------

lindong28 closed pull request #5591: KAFKA-7322: Fix race condition between log 
cleaner thread and log retention thread when topic cleanup policy is updated
URL: https://github.com/apache/kafka/pull/5591
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala 
b/core/src/main/scala/kafka/log/LogCleaner.scala
index 91ddbf09305..0b4abe80ef1 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -36,7 +36,7 @@ import org.apache.kafka.common.record._
 import org.apache.kafka.common.utils.Time
 
 import scala.collection.JavaConverters._
-import scala.collection.{Set, mutable}
+import scala.collection.{Iterable, Set, mutable}
 
 /**
  * The cleaner is responsible for removing obsolete records from logs which 
have the "compact" retention strategy.
@@ -219,10 +219,10 @@ class LogCleaner(initialConfig: CleanerConfig,
   }
 
   /**
-   *  Resume the cleaning of a paused partition. This call blocks until the 
cleaning of a partition is resumed.
-   */
-  def resumeCleaning(topicPartition: TopicPartition) {
-    cleanerManager.resumeCleaning(topicPartition)
+    *  Resume the cleaning of paused partitions.
+    */
+  def resumeCleaning(topicPartitions: Iterable[TopicPartition]) {
+    cleanerManager.resumeCleaning(topicPartitions)
   }
 
   /**
@@ -246,6 +246,15 @@ class LogCleaner(initialConfig: CleanerConfig,
     isCleaned
   }
 
+  /**
+    * To prevent race between retention and compaction,
+    * retention threads need to make this call to obtain:
+    * @return A list of log partitions that retention threads can safely work 
on
+    */
+  def pauseCleaningForNonCompactedPartitions(): Iterable[(TopicPartition, 
Log)] = {
+    cleanerManager.pauseCleaningForNonCompactedPartitions()
+  }
+
   // Only for testing
   private[kafka] def currentConfig: CleanerConfig = config
 
@@ -315,14 +324,16 @@ class LogCleaner(initialConfig: CleanerConfig,
           true
       }
       val deletable: Iterable[(TopicPartition, Log)] = 
cleanerManager.deletableLogs()
-      deletable.foreach{
-        case (topicPartition, log) =>
-          try {
+
+      try {
+        deletable.foreach {
+          case (_, log) =>
             log.deleteOldSegments()
-          } finally {
-            cleanerManager.doneDeleting(topicPartition)
-          }
+        }
+      } finally {
+        cleanerManager.doneDeleting(deletable.map(_._1))
       }
+
       if (!cleaned)
         pause(config.backOffMs, TimeUnit.MILLISECONDS)
     }
diff --git a/core/src/main/scala/kafka/log/LogCleanerManager.scala 
b/core/src/main/scala/kafka/log/LogCleanerManager.scala
index ba8d7c7e9c0..83d902f952a 100755
--- a/core/src/main/scala/kafka/log/LogCleanerManager.scala
+++ b/core/src/main/scala/kafka/log/LogCleanerManager.scala
@@ -32,7 +32,7 @@ import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.utils.Time
 import org.apache.kafka.common.errors.KafkaStorageException
 
-import scala.collection.{immutable, mutable}
+import scala.collection.{Iterable, immutable, mutable}
 
 private[log] sealed trait LogCleaningState
 private[log] case object LogCleaningInProgress extends LogCleaningState
@@ -148,6 +148,28 @@ private[log] class LogCleanerManager(val logDirs: 
Seq[File],
     }
   }
 
+  /**
+    * Pause logs cleaning for logs that do not have compaction enabled
+    * and do not have other deletion or compaction in progress.
+    * This is to handle potential race between retention and cleaner threads 
when users
+    * switch topic configuration between compacted and non-compacted topic.
+    * @return retention logs that have log cleaning successfully paused
+    */
+  def pauseCleaningForNonCompactedPartitions(): Iterable[(TopicPartition, 
Log)] = {
+    inLock(lock) {
+      val deletableLogs = logs.filter {
+        case (_, log) => !log.config.compact // pick non-compacted logs
+      }.filterNot {
+        case (topicPartition, _) => inProgress.contains(topicPartition) // 
skip any logs already in-progress
+      }
+
+      deletableLogs.foreach {
+        case (topicPartition, _) => inProgress.put(topicPartition, 
LogCleaningPaused)
+      }
+      deletableLogs
+    }
+  }
+
   /**
     * Find any logs that have compact and delete enabled
     */
@@ -170,7 +192,7 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
   def abortCleaning(topicPartition: TopicPartition) {
     inLock(lock) {
       abortAndPauseCleaning(topicPartition)
-      resumeCleaning(topicPartition)
+      resumeCleaning(Seq(topicPartition))
     }
     info(s"The cleaning for partition $topicPartition is aborted")
   }
@@ -206,23 +228,25 @@ private[log] class LogCleanerManager(val logDirs: 
Seq[File],
   }
 
   /**
-   *  Resume the cleaning of a paused partition. This call blocks until the 
cleaning of a partition is resumed.
-   */
-  def resumeCleaning(topicPartition: TopicPartition) {
+    *  Resume the cleaning of paused partitions.
+    */
+  def resumeCleaning(topicPartitions: Iterable[TopicPartition]){
     inLock(lock) {
-      inProgress.get(topicPartition) match {
-        case None =>
-          throw new IllegalStateException(s"Compaction for partition 
$topicPartition cannot be resumed since it is not paused.")
-        case Some(state) =>
-          state match {
-            case LogCleaningPaused =>
-              inProgress.remove(topicPartition)
-            case s =>
-              throw new IllegalStateException(s"Compaction for partition 
$topicPartition cannot be resumed since it is in $s state.")
+      topicPartitions.foreach {
+        topicPartition =>
+          inProgress.get(topicPartition) match {
+            case None =>
+              throw new IllegalStateException(s"Compaction for partition 
$topicPartition cannot be resumed since it is not paused.")
+            case Some(state) =>
+              state match {
+                case LogCleaningPaused =>
+                  inProgress.remove(topicPartition)
+                case s =>
+                  throw new IllegalStateException(s"Compaction for partition 
$topicPartition cannot be resumed since it is in $s state.")
+              }
           }
       }
     }
-    info(s"Compaction for partition $topicPartition is resumed")
   }
 
   /**
@@ -322,18 +346,21 @@ private[log] class LogCleanerManager(val logDirs: 
Seq[File],
     }
   }
 
-  def doneDeleting(topicPartition: TopicPartition): Unit = {
+  def doneDeleting(topicPartitions: Iterable[TopicPartition]): Unit = {
     inLock(lock) {
-      inProgress.get(topicPartition) match {
-        case Some(LogCleaningInProgress) =>
-          inProgress.remove(topicPartition)
-        case Some(LogCleaningAborted) =>
-          inProgress.put(topicPartition, LogCleaningPaused)
-          pausedCleaningCond.signalAll()
-        case None =>
-          throw new IllegalStateException(s"State for partition 
$topicPartition should exist.")
-        case s =>
-          throw new IllegalStateException(s"In-progress partition 
$topicPartition cannot be in $s state.")
+      topicPartitions.foreach {
+        topicPartition =>
+          inProgress.get(topicPartition) match {
+            case Some(LogCleaningInProgress) =>
+              inProgress.remove(topicPartition)
+            case Some(LogCleaningAborted) =>
+              inProgress.put(topicPartition, LogCleaningPaused)
+              pausedCleaningCond.signalAll()
+            case None =>
+              throw new IllegalStateException(s"State for partition 
$topicPartition should exist.")
+            case s =>
+              throw new IllegalStateException(s"In-progress partition 
$topicPartition cannot be in $s state.")
+          }
       }
     }
   }
diff --git a/core/src/main/scala/kafka/log/LogManager.scala 
b/core/src/main/scala/kafka/log/LogManager.scala
index 32203acde9a..eab85098474 100755
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -515,8 +515,10 @@ class LogManager(logDirs: Seq[File],
           if (needToStopCleaner && !isFuture)
             cleaner.maybeTruncateCheckpoint(log.dir.getParentFile, 
topicPartition, log.activeSegment.baseOffset)
         } finally {
-          if (needToStopCleaner && !isFuture)
-            cleaner.resumeCleaning(topicPartition)
+          if (needToStopCleaner && !isFuture) {
+            cleaner.resumeCleaning(Seq(topicPartition))
+            info(s"Compaction for partition $topicPartition is resumed")
+          }
         }
       }
     }
@@ -547,7 +549,8 @@ class LogManager(logDirs: Seq[File],
       log.truncateFullyAndStartAt(newOffset)
       if (cleaner != null && !isFuture) {
         cleaner.maybeTruncateCheckpoint(log.dir.getParentFile, topicPartition, 
log.activeSegment.baseOffset)
-        cleaner.resumeCleaning(topicPartition)
+        cleaner.resumeCleaning(Seq(topicPartition))
+        info(s"Compaction for partition $topicPartition is resumed")
       }
       checkpointLogRecoveryOffsetsInDir(log.dir.getParentFile)
     }
@@ -785,7 +788,8 @@ class LogManager(logDirs: Seq[File],
       currentLogs.put(topicPartition, destLog)
       if (cleaner != null) {
         cleaner.alterCheckpointDir(topicPartition, 
sourceLog.dir.getParentFile, destLog.dir.getParentFile)
-        cleaner.resumeCleaning(topicPartition)
+        cleaner.resumeCleaning(Seq(topicPartition))
+        info(s"Compaction for partition $topicPartition is resumed")
       }
 
       try {
@@ -869,10 +873,38 @@ class LogManager(logDirs: Seq[File],
     debug("Beginning log cleanup...")
     var total = 0
     val startMs = time.milliseconds
-    for(log <- allLogs; if !log.config.compact) {
-      debug("Garbage collecting '" + log.name + "'")
-      total += log.deleteOldSegments()
+
+    // clean current logs.
+    val deletableLogs = {
+      if (cleaner != null) {
+        // prevent cleaner from working on same partitions when changing 
cleanup policy
+        cleaner.pauseCleaningForNonCompactedPartitions()
+      } else {
+        currentLogs.filter {
+          case (_, log) => !log.config.compact
+        }
+      }
     }
+
+    try {
+      deletableLogs.foreach {
+        case (topicPartition, log) =>
+          debug("Garbage collecting '" + log.name + "'")
+          total += log.deleteOldSegments()
+
+          val futureLog = futureLogs.get(topicPartition)
+          if (futureLog != null) {
+            // clean future logs
+            debug("Garbage collecting future log '" + futureLog.name + "'")
+            total += futureLog.deleteOldSegments()
+          }
+      }
+    } finally {
+      if (cleaner != null) {
+        cleaner.resumeCleaning(deletableLogs.map(_._1))
+      }
+    }
+
     debug("Log cleanup completed. " + total + " files deleted in " +
                   (time.milliseconds - startMs) / 1000 + " seconds")
   }
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala 
b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
index 7455763f5b7..8cb2f9ec874 100644
--- a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
@@ -90,6 +90,58 @@ class LogCleanerManagerTest extends JUnitSuite with Logging {
     assertEquals("should have 1 logs ready to be deleted", 0, readyToDelete)
   }
 
+  /**
+    * log with retention in progress should not be picked up for compaction 
and vice versa when log cleanup policy
+    * is changed between "compact" and "delete"
+    */
+  @Test
+  def 
testLogsWithRetentionInprogressShouldNotPickedUpForCompactionAndViceVersa(): 
Unit = {
+    val records = TestUtils.singletonRecords("test".getBytes, 
key="test".getBytes)
+    val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Delete)
+    val cleanerManager: LogCleanerManager = createCleanerManager(log)
+
+    log.appendAsLeader(records, leaderEpoch = 0)
+    log.roll()
+    log.appendAsLeader(records, leaderEpoch = 0)
+    log.onHighWatermarkIncremented(2L)
+
+    // simulate retention thread working on the log partition
+    val deletableLog = cleanerManager.pauseCleaningForNonCompactedPartitions()
+    assertEquals("should have 1 logs ready to be deleted", 1, 
deletableLog.size)
+
+    // change cleanup policy from delete to compact
+    val logProps = new Properties()
+    logProps.put(LogConfig.SegmentBytesProp, log.config.segmentSize)
+    logProps.put(LogConfig.RetentionMsProp, log.config.retentionMs)
+    logProps.put(LogConfig.CleanupPolicyProp, LogConfig.Compact)
+    logProps.put(LogConfig.MinCleanableDirtyRatioProp, 0: Integer)
+    val config = LogConfig(logProps)
+    log.config = config
+
+    // log retention inprogress, the log is not available for compaction
+    val cleanable = cleanerManager.grabFilthiestCompactedLog(time)
+    assertEquals("should have 0 logs ready to be compacted", 0, cleanable.size)
+
+    // log retention finished, and log can be picked up for compaction
+    cleanerManager.resumeCleaning(deletableLog.map(_._1))
+    val cleanable2 = cleanerManager.grabFilthiestCompactedLog(time)
+    assertEquals("should have 1 logs ready to be compacted", 1, 
cleanable2.size)
+
+    // update cleanup policy to delete
+    logProps.put(LogConfig.CleanupPolicyProp, LogConfig.Delete)
+    val config2 = LogConfig(logProps)
+    log.config = config2
+
+    // compaction in progress, should have 0 log eligible for log retention
+    val deletableLog2 = cleanerManager.pauseCleaningForNonCompactedPartitions()
+    assertEquals("should have 0 logs ready to be deleted", 0, 
deletableLog2.size)
+
+    // compaction done, should have 1 log eligible for log retention
+    cleanerManager.doneDeleting(Seq(cleanable2.get.topicPartition))
+    val deletableLog3 = cleanerManager.pauseCleaningForNonCompactedPartitions()
+    assertEquals("should have 1 logs ready to be deleted", 1, 
deletableLog3.size)
+  }
+
   /**
     * Test computation of cleanable range with no minimum compaction lag 
settings active
     */
@@ -250,17 +302,17 @@ class LogCleanerManagerTest extends JUnitSuite with 
Logging {
 
     val tp = new TopicPartition("log", 0)
 
-    intercept[IllegalStateException](cleanerManager.doneDeleting(tp))
+    intercept[IllegalStateException](cleanerManager.doneDeleting(Seq(tp)))
 
     cleanerManager.setCleaningState(tp, LogCleaningPaused)
-    intercept[IllegalStateException](cleanerManager.doneDeleting(tp))
+    intercept[IllegalStateException](cleanerManager.doneDeleting(Seq(tp)))
 
     cleanerManager.setCleaningState(tp, LogCleaningInProgress)
-    cleanerManager.doneDeleting(tp)
+    cleanerManager.doneDeleting(Seq(tp))
     assertTrue(cleanerManager.cleaningState(tp).isEmpty)
 
     cleanerManager.setCleaningState(tp, LogCleaningAborted)
-    cleanerManager.doneDeleting(tp)
+    cleanerManager.doneDeleting(Seq(tp))
     assertEquals(LogCleaningPaused, cleanerManager.cleaningState(tp).get)
 
   }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Fix race condition between log cleaner thread and log retention thread when 
> topic cleanup policy is updated
> -----------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-7322
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7322
>             Project: Kafka
>          Issue Type: Bug
>          Components: log
>            Reporter: xiongqi wu
>            Assignee: xiongqi wu
>            Priority: Major
>             Fix For: 2.1.0
>
>
> The deletion thread will grab the log.lock when it tries to rename log 
> segment and schedule for actual deletion.
> The compaction thread only grabs the log.lock when it tries to replace the 
> original segments with the cleaned segment. The compaction thread doesn't 
> grab the log when it reads records from the original segments to build 
> offsetmap and new segments. As a result, if both deletion and compaction 
> threads work on the same log partition. We have a race condition. 
> This race happens when the topic cleanup policy is updated on the fly.  
> One case to hit this race condition:
> 1: topic clean up policy is "compact" initially 
> 2: log cleaner (compaction) thread picks up the partition for compaction and 
> still in progress
> 3: the topic clean up policy has been updated to "deletion"
> 4: retention thread pick up the topic partition and delete some old segments.
> 5: log cleaner thread reads from the deleted log and raise an IO exception. 
>  
> The proposed solution is to use "inprogress" map that cleaner manager has to 
> protect such a race.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to