[
https://issues.apache.org/jira/browse/KAFKA-1414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13981025#comment-13981025
]
Dmitry Bugaychenko commented on KAFKA-1414:
-------------------------------------------
Actually similar improvement is needed for shutdown to - on a 20-disk with no
forced flush during processing it could take more than 10 minutes to shutdown
and flush logs in a single thread. We end up with the following workaround:
{code}
def shutdown() {
info("Shutting down.")
try {
// stop the cleaner first
if (cleaner != null) {
Utils.swallow(cleaner.shutdown())
}
// Span single shutdown thread for each data dir.
val threadCounter : AtomicInteger = new AtomicInteger()
// TODO: This must be configurable!!!!
val semaphore : Semaphore = new Semaphore(8)
val threads : ArrayBuffer[Thread] = new ArrayBuffer[Thread]()
threads ++= logDirs.map(
x => {
val thread : Thread = new Thread(new Runnable {
override def run(): Unit = {
threadCounter.incrementAndGet()
semaphore.acquire()
try {
val parent : String = x.toString
val thisDirLogs : Seq[(TopicAndPartition,Log)] =
logs.filter(_._2.dir.toString.startsWith(parent)).toSeq
// flush the logs to ensure latest possible recovery point
info("Flushing logs at " + parent)
thisDirLogs.foreach(_._2.flush())
info("Closing logs at " + parent)
// close the logs
thisDirLogs.foreach(_._2.close())
// update the last flush point
info("Updating recovery points " + parent)
recoveryPointCheckpoints(x).write(
thisDirLogs.groupBy(_._1).mapValues(_.head._2.recoveryPoint))
// mark that the shutdown was clean by creating the clean
shutdown marker file
info("Writing clean shutdown marker " + parent)
Utils.swallow(new File(parent,
Log.CleanShutdownFile).createNewFile())
} finally {
semaphore.release()
threadCounter.decrementAndGet()
}
}
})
thread.start()
thread
})
// Wait them all
threads.foreach(_.join())
// Check that all threads ended
if (threadCounter.get() > 0) {
error("Not all shut down threads ended.")
}
} finally {
// regardless of whether the close succeeded, we need to unlock the data
directories
dirLocks.foreach(_.destroy())
}
info("Shutdown complete.")
}
{code}
Howevere, there are evidences that running too many threads flushing at
shutdown can cause JVM to terminate due to native out of memory:
{code}
# There is insufficient memory for the Java Runtime Environment to continue.
# Native memory allocation (malloc) failed to allocate 4088 bytes for
AllocateHeap
JVM exited unexpectedly while stopping the application
{code}
> Speedup broker startup after hard reset
> ---------------------------------------
>
> Key: KAFKA-1414
> URL: https://issues.apache.org/jira/browse/KAFKA-1414
> Project: Kafka
> Issue Type: Improvement
> Components: log
> Affects Versions: 0.8.1
> Reporter: Dmitry Bugaychenko
> Assignee: Jay Kreps
>
> After hard reset due to power failure broker takes way too much time
> recovering unflushed segments in a single thread. This could be easiliy
> improved launching multiple threads (one per data dirrectory, assuming that
> typically each data directory is on a dedicated drive). Localy we trie this
> simple patch to LogManager.loadLogs and it seems to work, however I'm too new
> to scala, so do not take it literally:
> {code}
> /**
> * Recover and load all logs in the given data directories
> */
> private def loadLogs(dirs: Seq[File]) {
> val threads : Array[Thread] = new Array[Thread](dirs.size)
> var i: Int = 0
> val me = this
> for(dir <- dirs) {
> val thread = new Thread( new Runnable {
> def run()
> {
> val recoveryPoints = me.recoveryPointCheckpoints(dir).read
> /* load the logs */
> val subDirs = dir.listFiles()
> if(subDirs != null) {
> val cleanShutDownFile = new File(dir, Log.CleanShutdownFile)
> if(cleanShutDownFile.exists())
> info("Found clean shutdown file. Skipping recovery for all logs
> in data directory '%s'".format(dir.getAbsolutePath))
> for(dir <- subDirs) {
> if(dir.isDirectory) {
> info("Loading log '" + dir.getName + "'")
> val topicPartition = Log.parseTopicPartitionName(dir.getName)
> val config = topicConfigs.getOrElse(topicPartition.topic,
> defaultConfig)
> val log = new Log(dir,
> config,
> recoveryPoints.getOrElse(topicPartition, 0L),
> scheduler,
> time)
> val previous = addLogWithLock(topicPartition, log)
> if(previous != null)
> throw new IllegalArgumentException("Duplicate log
> directories found: %s, %s!".format(log.dir.getAbsolutePath,
> previous.dir.getAbsolutePath))
> }
> }
> cleanShutDownFile.delete()
> }
> }
> })
> thread.start()
> threads(i) = thread
> i = i + 1
> }
> for(thread <- threads) {
> thread.join()
> }
> }
> def addLogWithLock(topicPartition: TopicAndPartition, log: Log): Log = {
> logCreationOrDeletionLock synchronized {
> this.logs.put(topicPartition, log)
> }
> }
> {code}
--
This message was sent by Atlassian JIRA
(v6.2#6252)