[ 
https://issues.apache.org/jira/browse/KAFKA-6710?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16412790#comment-16412790
 ] 

ASF GitHub Bot commented on KAFKA-6710:
---------------------------------------

rajinisivaram closed pull request #4771: KAFKA-6710: Remove Thread.sleep from 
LogManager.deleteLogs
URL: https://github.com/apache/kafka/pull/4771
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/scala/kafka/log/LogManager.scala 
b/core/src/main/scala/kafka/log/LogManager.scala
index 7aa5bcd88d8..f26a84c6244 100755
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -79,13 +79,15 @@ class LogManager(logDirs: Seq[File],
   private val logsToBeDeleted = new LinkedBlockingQueue[(Log, Long)]()
 
   private val _liveLogDirs: ConcurrentLinkedQueue[File] = 
createAndValidateLogDirs(logDirs, initialOfflineDirs)
-  @volatile var currentDefaultConfig = initialDefaultConfig
+  @volatile private var _currentDefaultConfig = initialDefaultConfig
   @volatile private var numRecoveryThreadsPerDataDir = 
recoveryThreadsPerDataDir
 
   def reconfigureDefaultLogConfig(logConfig: LogConfig): Unit = {
-    this.currentDefaultConfig = logConfig
+    this._currentDefaultConfig = logConfig
   }
 
+  def currentDefaultConfig: LogConfig = _currentDefaultConfig
+
   def liveLogDirs: Seq[File] = {
     if (_liveLogDirs.size == logDirs.size)
       logDirs
@@ -245,6 +247,9 @@ class LogManager(logDirs: Seq[File],
     this.logsToBeDeleted.add((log, time.milliseconds()))
   }
 
+  // Only for testing
+  private[log] def hasLogsToBeDeleted: Boolean = !logsToBeDeleted.isEmpty
+
   private def loadLog(logDir: File, recoveryPoints: Map[TopicPartition, Long], 
logStartOffsets: Map[TopicPartition, Long]): Unit = {
     debug("Loading log '" + logDir.getName + "'")
     val topicPartition = Log.parseTopicPartitionName(logDir)
@@ -704,17 +709,27 @@ class LogManager(logDirs: Seq[File],
   }
 
   /**
-   *  Delete logs marked for deletion.
+   *  Delete logs marked for deletion. Delete all logs for which 
`currentDefaultConfig.fileDeleteDelayMs`
+   *  has elapsed after the delete was scheduled. Logs for which this interval 
has not yet elapsed will be
+   *  considered for deletion in the next iteration of `deleteLogs`. The next 
iteration will be executed
+   *  after the remaining time for the first log that is not deleted. If there 
are no more `logsToBeDeleted`,
+   *  `deleteLogs` will be executed after 
`currentDefaultConfig.fileDeleteDelayMs`.
    */
   private def deleteLogs(): Unit = {
+    var nextDelayMs = 0L
     try {
-      while (!logsToBeDeleted.isEmpty) {
-        val (removedLog, scheduleTimeMs) = logsToBeDeleted.take()
+      def nextDeleteDelayMs: Long = {
+        if (!logsToBeDeleted.isEmpty) {
+          val (_, scheduleTimeMs) = logsToBeDeleted.peek()
+          scheduleTimeMs + currentDefaultConfig.fileDeleteDelayMs - 
time.milliseconds()
+        } else
+          currentDefaultConfig.fileDeleteDelayMs
+      }
+
+      while ({nextDelayMs = nextDeleteDelayMs; nextDelayMs <= 0}) {
+        val (removedLog, _) = logsToBeDeleted.take()
         if (removedLog != null) {
           try {
-            val waitingTimeMs = scheduleTimeMs + 
currentDefaultConfig.fileDeleteDelayMs - time.milliseconds()
-            if (waitingTimeMs > 0)
-              Thread.sleep(waitingTimeMs)
             removedLog.delete()
             info(s"Deleted log for partition ${removedLog.topicPartition} in 
${removedLog.dir.getAbsolutePath}.")
           } catch {
@@ -730,7 +745,7 @@ class LogManager(logDirs: Seq[File],
       try {
         scheduler.schedule("kafka-delete-logs",
           deleteLogs _,
-          delay = currentDefaultConfig.fileDeleteDelayMs,
+          delay = nextDelayMs,
           unit = TimeUnit.MILLISECONDS)
       } catch {
         case e: Throwable =>
diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala 
b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
index 2fdda6b8827..d9efc236780 100755
--- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
@@ -332,5 +332,10 @@ class LogManagerTest {
       assertNotEquals("File reference was not updated in index", 
fileBeforeDelete.getAbsolutePath,
         fileInIndex.get.getAbsolutePath)
     }
+
+    time.sleep(logManager.InitialTaskDelayMs)
+    assertTrue("Logs deleted too early", logManager.hasLogsToBeDeleted)
+    time.sleep(logManager.currentDefaultConfig.fileDeleteDelayMs - 
logManager.InitialTaskDelayMs)
+    assertFalse("Logs not deleted", logManager.hasLogsToBeDeleted)
   }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Streams integration tests hang during shutdown
> ----------------------------------------------
>
>                 Key: KAFKA-6710
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6710
>             Project: Kafka
>          Issue Type: Bug
>          Components: core, streams
>    Affects Versions: 1.1.0
>            Reporter: Rajini Sivaram
>            Assignee: Rajini Sivaram
>            Priority: Major
>
> Builds have been timing out a lot recently and many of the logs show streams 
> integration tests being run, but not completed. While running tests locally, 
> I saw a failure during shutdown of {{TableTableJoinIntegrationTest}}. The 
> test was stuck waiting for a broker to shutdown when a {{KafkaScheduler}} was 
> attemping to delete logs. KAFKA-6624 (Commit 
> #1ea07b993d75ed68f4c04282eb177bf84156e0b2) added a _Thread.sleep_ to wait for 
> the time to delete each log segment inside the scheduled delete task. The 
> failing streams test had 62 logs to delete and since MockTime doesn't get 
> updated during the test, it would have waited for 62 minutes to complete. 
> This blocks shutdown of the broker for 62 minutes. This is an issue if a 
> streams integration test takes more than 30 seconds when the first delayed 
> delete task is scheduled to be run.
> Changing _Thread.sleep_ to _time.sleep_ fixes this test issue. But it will be 
> good to know why we have a _sleep_ on a _Scheduler_ at all. With the default 
> _log.segment.delete.delay.ms_ of one minute, this potentially blocks a 
> scheduler thread for upto a minute when there are logs to be deleted. 
> Couldn't we just break out of the loop if it is not yet time to delete the 
> first log segment in the list? The log would then get deleted when the broker 
> checks next time. [~junrao] [~lindong] ?
>  
> *Stack trace from failing test*:
> {{"kafka-scheduler-8" daemon prio=5 tid=0x00007fe58dc16800 nid=0x9603 waiting 
> on condition [0x0000700003f25000]}}
> {{   java.lang.Thread.State: TIMED_WAITING (sleeping)}}
> {{        at java.lang.Thread.sleep(Native Method)}}
> {{        at 
> kafka.log.LogManager.kafka$log$LogManager$$deleteLogs(LogManager.scala:717)}}
> {{        at 
> kafka.log.LogManager$$anonfun$3.apply$mcV$sp(LogManager.scala:406)}}
> {{        at 
> kafka.utils.KafkaScheduler$$anonfun$1.apply$mcV$sp(KafkaScheduler.scala:110)}}
> {{        at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:62)}}
> {{        at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)}}
> {{        at java.util.concurrent.FutureTask.run(FutureTask.java:262)}}
> {{        at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:178)}}
> {{        at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:292)}}
> {{        at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)}}
> {{        at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)}}
> {{        at java.lang.Thread.run(Thread.java:745)}}{{}}
> {{}}{{"Test worker" prio=5 tid=0x00007fe58db72000 nid=0x5203 waiting on 
> condition [0x0000700001cbd000]}}
> {{   java.lang.Thread.State: TIMED_WAITING (parking)}}
> {{        at sun.misc.Unsafe.park(Native Method)}}
> {{        - parking to wait for  <0x0000000780fb8918> (a 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)}}
> {{        at 
> java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:226)}}
> {{        at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2082)}}
> {{        at 
> java.util.concurrent.ThreadPoolExecutor.awaitTermination(ThreadPoolExecutor.java:1468)}}
> {{        at kafka.utils.KafkaScheduler.shutdown(KafkaScheduler.scala:98)}}
> {{        at 
> kafka.server.KafkaServer$$anonfun$shutdown$5.apply$mcV$sp(KafkaServer.scala:569)}}
> {{        at kafka.utils.CoreUtils$.swallow(CoreUtils.scala:85)}}
> {{        at kafka.server.KafkaServer.shutdown(KafkaServer.scala:569)}}
> {{        at 
> org.apache.kafka.streams.integration.utils.KafkaEmbedded.stop(KafkaEmbedded.java:129)}}
> {{        at 
> org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster.stop(EmbeddedKafkaCluster.java:126)}}
> {{        at 
> org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster.after(EmbeddedKafkaCluster.java:158)}}
> {{        at 
> org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:50)}}
> {{        at org.junit.rules.RunRules.evaluate(RunRules.java:20)}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to