[ 
https://issues.apache.org/jira/browse/KAFKA-6624?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16396554#comment-16396554
 ] 

ASF GitHub Bot commented on KAFKA-6624:
---------------------------------------

junrao closed pull request #4663: KAFKA-6624; Prevent concurrent log flush and 
log deletion
URL: https://github.com/apache/kafka/pull/4663
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/scala/kafka/log/LogManager.scala 
b/core/src/main/scala/kafka/log/LogManager.scala
index 9ae93aadf06..7aa5bcd88d8 100755
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -75,7 +75,8 @@ class LogManager(logDirs: Seq[File],
   // from one log directory to another log directory on the same broker. The 
directory of the future log will be renamed
   // to replace the current log of the partition after the future log catches 
up with the current log
   private val futureLogs = new Pool[TopicPartition, Log]()
-  private val logsToBeDeleted = new LinkedBlockingQueue[Log]()
+  // Each element in the queue contains the log object to be deleted and the 
time it is scheduled for deletion.
+  private val logsToBeDeleted = new LinkedBlockingQueue[(Log, Long)]()
 
   private val _liveLogDirs: ConcurrentLinkedQueue[File] = 
createAndValidateLogDirs(logDirs, initialOfflineDirs)
   @volatile var currentDefaultConfig = initialDefaultConfig
@@ -240,6 +241,10 @@ class LogManager(logDirs: Seq[File],
     }
   }
 
+  private def addLogToBeDeleted(log: Log): Unit = {
+    this.logsToBeDeleted.add((log, time.milliseconds()))
+  }
+
   private def loadLog(logDir: File, recoveryPoints: Map[TopicPartition, Long], 
logStartOffsets: Map[TopicPartition, Long]): Unit = {
     debug("Loading log '" + logDir.getName + "'")
     val topicPartition = Log.parseTopicPartitionName(logDir)
@@ -260,7 +265,7 @@ class LogManager(logDirs: Seq[File],
       logDirFailureChannel = logDirFailureChannel)
 
     if (logDir.getName.endsWith(Log.DeleteDirSuffix)) {
-      this.logsToBeDeleted.add(log)
+      addLogToBeDeleted(log)
     } else {
       val previous = {
         if (log.isFuture)
@@ -704,9 +709,12 @@ class LogManager(logDirs: Seq[File],
   private def deleteLogs(): Unit = {
     try {
       while (!logsToBeDeleted.isEmpty) {
-        val removedLog = logsToBeDeleted.take()
+        val (removedLog, scheduleTimeMs) = logsToBeDeleted.take()
         if (removedLog != null) {
           try {
+            val waitingTimeMs = scheduleTimeMs + 
currentDefaultConfig.fileDeleteDelayMs - time.milliseconds()
+            if (waitingTimeMs > 0)
+              Thread.sleep(waitingTimeMs)
             removedLog.delete()
             info(s"Deleted log for partition ${removedLog.topicPartition} in 
${removedLog.dir.getAbsolutePath}.")
           } catch {
@@ -767,7 +775,7 @@ class LogManager(logDirs: Seq[File],
         sourceLog.close()
         checkpointLogRecoveryOffsetsInDir(sourceLog.dir.getParentFile)
         checkpointLogStartOffsetsInDir(sourceLog.dir.getParentFile)
-        logsToBeDeleted.add(sourceLog)
+        addLogToBeDeleted(sourceLog)
       } catch {
         case e: KafkaStorageException =>
           // If sourceLog's log directory is offline, we need close its 
handlers here.
@@ -805,7 +813,7 @@ class LogManager(logDirs: Seq[File],
       removedLog.renameDir(Log.logDeleteDirName(topicPartition))
       checkpointLogRecoveryOffsetsInDir(removedLog.dir.getParentFile)
       checkpointLogStartOffsetsInDir(removedLog.dir.getParentFile)
-      logsToBeDeleted.add(removedLog)
+      addLogToBeDeleted(removedLog)
       info(s"Log for partition ${removedLog.topicPartition} is renamed to 
${removedLog.dir.getAbsolutePath} and is scheduled for deletion")
     } else if (offlineLogDirs.nonEmpty) {
       throw new KafkaStorageException("Failed to delete log for " + 
topicPartition + " because it may be in one of the offline directories " + 
offlineLogDirs.mkString(","))


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> log segment deletion could cause a disk to be marked offline incorrectly
> ------------------------------------------------------------------------
>
>                 Key: KAFKA-6624
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6624
>             Project: Kafka
>          Issue Type: Bug
>          Components: core
>    Affects Versions: 1.1.0
>            Reporter: Jun Rao
>            Assignee: Dong Lin
>            Priority: Major
>
> Saw the following log.
> [2018-03-06 23:12:20,721] ERROR Error while flushing log for topic1-0 in dir 
> /data01/kafka-logs with offset 80993 (kafka.server.LogDirFailureChannel)
> java.nio.channels.ClosedChannelException
>         at sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:110)
>         at sun.nio.ch.FileChannelImpl.force(FileChannelImpl.java:379)
>         at 
> org.apache.kafka.common.record.FileRecords.flush(FileRecords.java:163)
>         at 
> kafka.log.LogSegment$$anonfun$flush$1.apply$mcV$sp(LogSegment.scala:375)
>         at kafka.log.LogSegment$$anonfun$flush$1.apply(LogSegment.scala:374)
>         at kafka.log.LogSegment$$anonfun$flush$1.apply(LogSegment.scala:374)
>         at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:31)
>         at kafka.log.LogSegment.flush(LogSegment.scala:374)
>         at 
> kafka.log.Log$$anonfun$flush$1$$anonfun$apply$mcV$sp$4.apply(Log.scala:1374)
>         at 
> kafka.log.Log$$anonfun$flush$1$$anonfun$apply$mcV$sp$4.apply(Log.scala:1373)
>         at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>         at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>         at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>         at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>         at kafka.log.Log$$anonfun$flush$1.apply$mcV$sp(Log.scala:1373)
>         at kafka.log.Log$$anonfun$flush$1.apply(Log.scala:1368)
>         at kafka.log.Log$$anonfun$flush$1.apply(Log.scala:1368)
>         at kafka.log.Log.maybeHandleIOException(Log.scala:1669)
>         at kafka.log.Log.flush(Log.scala:1368)
>         at 
> kafka.log.Log$$anonfun$roll$2$$anonfun$apply$1.apply$mcV$sp(Log.scala:1343)
>         at 
> kafka.utils.KafkaScheduler$$anonfun$1.apply$mcV$sp(KafkaScheduler.scala:110)
>         at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:61)
>         at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>         at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>         at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>         at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>         at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>         at java.lang.Thread.run(Thread.java:748)
> [2018-03-06 23:12:20,722] INFO [ReplicaManager broker=0] Stopping serving 
> replicas in dir /data01/kafka-logs (kafka.server.ReplicaManager)
> It seems that topic1 was being deleted around the time when flushing was 
> called. Then flushing hit an IOException, which caused the disk to be marked 
> offline incorrectly.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to