[ 
https://issues.apache.org/jira/browse/KAFKA-6568?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16368075#comment-16368075
 ] 

ASF GitHub Bot commented on KAFKA-6568:
---------------------------------------

hachikuji closed pull request #4580: KAFKA-6568; The log cleaner should check 
the partition state before r…
URL: https://github.com/apache/kafka/pull/4580
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/scala/kafka/log/LogCleanerManager.scala 
b/core/src/main/scala/kafka/log/LogCleanerManager.scala
index c3d3892aef9..b23107be491 100755
--- a/core/src/main/scala/kafka/log/LogCleanerManager.scala
+++ b/core/src/main/scala/kafka/log/LogCleanerManager.scala
@@ -96,6 +96,23 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
     }
   }
 
+  /**
+    * Package private for unit test. Get the cleaning state of the partition.
+    */
+  private[log] def cleaningState(tp: TopicPartition): Option[LogCleaningState] 
= {
+    inLock(lock) {
+      inProgress.get(tp)
+    }
+  }
+
+  /**
+    * Package private for unit test. Set the cleaning state of the partition.
+    */
+  private[log] def setCleaningState(tp: TopicPartition, state: 
LogCleaningState): Unit = {
+    inLock(lock) {
+      inProgress.put(tp, state)
+    }
+  }
 
    /**
     * Choose the log to clean next and add it to the in-progress set. We 
recompute this
@@ -290,11 +307,11 @@ private[log] class LogCleanerManager(val logDirs: 
Seq[File],
    */
   def doneCleaning(topicPartition: TopicPartition, dataDir: File, endOffset: 
Long) {
     inLock(lock) {
-      inProgress(topicPartition) match {
-        case LogCleaningInProgress =>
+      inProgress.get(topicPartition) match {
+        case Some(LogCleaningInProgress) =>
           updateCheckpoints(dataDir, Option(topicPartition, endOffset))
           inProgress.remove(topicPartition)
-        case LogCleaningAborted =>
+        case Some(LogCleaningAborted) =>
           inProgress.put(topicPartition, LogCleaningPaused)
           pausedCleaningCond.signalAll()
         case s =>
@@ -305,7 +322,15 @@ private[log] class LogCleanerManager(val logDirs: 
Seq[File],
 
   def doneDeleting(topicPartition: TopicPartition): Unit = {
     inLock(lock) {
-      inProgress.remove(topicPartition)
+      inProgress.get(topicPartition) match {
+        case Some(LogCleaningInProgress) =>
+          inProgress.remove(topicPartition)
+        case Some(LogCleaningAborted) =>
+          inProgress.put(topicPartition, LogCleaningPaused)
+          pausedCleaningCond.signalAll()
+        case s =>
+          throw new IllegalStateException(s"In-progress partition 
$topicPartition cannot be in $s state.")
+      }
     }
   }
 }
@@ -344,7 +369,7 @@ private[log] object LogCleanerManager extends Logging {
         offset
       }
     }
-    
+
     val compactionLagMs = math.max(log.config.compactionLagMs, 0L)
 
     // find first segment that cannot be cleaned
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala 
b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
index 114602919ea..42a447a2b16 100644
--- a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
@@ -215,6 +215,76 @@ class LogCleanerManagerTest extends JUnitSuite with 
Logging {
     assertEquals(4L, cleanableOffsets._2)
   }
 
+  @Test
+  def testDoneCleaning(): Unit = {
+    val logProps = new Properties()
+    logProps.put(LogConfig.SegmentBytesProp, 1024: java.lang.Integer)
+    val log = makeLog(config = LogConfig.fromProps(logConfig.originals, 
logProps))
+    while(log.numberOfSegments < 8)
+      log.appendAsLeader(records(log.logEndOffset.toInt, 
log.logEndOffset.toInt, time.milliseconds()), leaderEpoch = 0)
+
+    val cleanerManager: LogCleanerManager = createCleanerManager(log)
+
+    val tp = new TopicPartition("log", 0)
+    try {
+      cleanerManager.doneCleaning(tp, log.dir, 1)
+    } catch {
+      case _ : IllegalStateException =>
+      case _ : Throwable => fail("Should have thrown IllegalStateException.")
+    }
+
+    try {
+      cleanerManager.setCleaningState(tp, LogCleaningPaused)
+      cleanerManager.doneCleaning(tp, log.dir, 1)
+    } catch {
+      case _ : IllegalStateException =>
+      case _ : Throwable => fail("Should have thrown IllegalStateException.")
+    }
+
+    cleanerManager.setCleaningState(tp, LogCleaningInProgress)
+    cleanerManager.doneCleaning(tp, log.dir, 1)
+    assertTrue(cleanerManager.cleaningState(tp).isEmpty)
+    assertTrue(cleanerManager.allCleanerCheckpoints.get(tp).nonEmpty)
+
+    cleanerManager.setCleaningState(tp, LogCleaningAborted)
+    cleanerManager.doneCleaning(tp, log.dir, 1)
+    assertEquals(LogCleaningPaused, cleanerManager.cleaningState(tp).get)
+    assertTrue(cleanerManager.allCleanerCheckpoints.get(tp).nonEmpty)
+  }
+
+  @Test
+  def testDoneDeleting(): Unit = {
+    val records = TestUtils.singletonRecords("test".getBytes, 
key="test".getBytes)
+    val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Compact + "," 
+ LogConfig.Delete)
+    val cleanerManager: LogCleanerManager = createCleanerManager(log)
+
+    val tp = new TopicPartition("log", 0)
+
+    try {
+      cleanerManager.doneDeleting(tp)
+    } catch {
+      case _ : IllegalStateException =>
+      case _ : Throwable => fail("Should have thrown IllegalStateException.")
+    }
+
+    try {
+      cleanerManager.setCleaningState(tp, LogCleaningPaused)
+      cleanerManager.doneDeleting(tp)
+    } catch {
+      case _ : IllegalStateException =>
+      case _ : Throwable => fail("Should have thrown IllegalStateException.")
+    }
+
+    cleanerManager.setCleaningState(tp, LogCleaningInProgress)
+    cleanerManager.doneDeleting(tp)
+    assertTrue(cleanerManager.cleaningState(tp).isEmpty)
+
+    cleanerManager.setCleaningState(tp, LogCleaningAborted)
+    cleanerManager.doneDeleting(tp)
+    assertEquals(LogCleaningPaused, cleanerManager.cleaningState(tp).get)
+
+  }
+
   private def createCleanerManager(log: Log): LogCleanerManager = {
     val logs = new Pool[TopicPartition, Log]()
     logs.put(new TopicPartition("log", 0), log)


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> LogCleanerManager.doneDeleting() should check the partition state before 
> deleting the in progress partition
> -----------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-6568
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6568
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 0.10.1.1, 0.10.2.1, 1.0.0, 0.11.0.2
>            Reporter: Jiangjie Qin
>            Assignee: Jiangjie Qin
>            Priority: Blocker
>             Fix For: 1.1.0
>
>
> {{LogCleanerManager.doneDeleting()}} removes the partition from the 
> {{inProgress}} map without checking if the partition is paused or not. This 
> will cause the paused partition state to be lost, and may also cause another 
> thread calling {{LogCleanerManager.abortAndPauseCleaning()}} to block 
> indefinitely waiting on the partition state to become paused.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to