[jira] [Updated] (KAFKA-1414) Speedup broker startup after hard reset
[ https://issues.apache.org/jira/browse/KAFKA-1414?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Alexey Ozeritskiy updated KAFKA-1414: - Attachment: parallel-dir-loading-trunk.patch parallel-dir-loading-0.8.patch > Speedup broker startup after hard reset > --- > > Key: KAFKA-1414 > URL: https://issues.apache.org/jira/browse/KAFKA-1414 > Project: Kafka > Issue Type: Improvement > Components: log >Affects Versions: 0.8.2, 0.9.0, 0.8.1.1 >Reporter: Dmitry Bugaychenko >Assignee: Jay Kreps > Attachments: parallel-dir-loading-0.8.patch, > parallel-dir-loading-trunk.patch > > > After hard reset due to power failure broker takes way too much time > recovering unflushed segments in a single thread. This could be easiliy > improved launching multiple threads (one per data dirrectory, assuming that > typically each data directory is on a dedicated drive). Localy we trie this > simple patch to LogManager.loadLogs and it seems to work, however I'm too new > to scala, so do not take it literally: > {code} > /** >* Recover and load all logs in the given data directories >*/ > private def loadLogs(dirs: Seq[File]) { > val threads : Array[Thread] = new Array[Thread](dirs.size) > var i: Int = 0 > val me = this > for(dir <- dirs) { > val thread = new Thread( new Runnable { > def run() > { > val recoveryPoints = me.recoveryPointCheckpoints(dir).read > /* load the logs */ > val subDirs = dir.listFiles() > if(subDirs != null) { > val cleanShutDownFile = new File(dir, Log.CleanShutdownFile) > if(cleanShutDownFile.exists()) > info("Found clean shutdown file. Skipping recovery for all logs > in data directory '%s'".format(dir.getAbsolutePath)) > for(dir <- subDirs) { > if(dir.isDirectory) { > info("Loading log '" + dir.getName + "'") > val topicPartition = Log.parseTopicPartitionName(dir.getName) > val config = topicConfigs.getOrElse(topicPartition.topic, > defaultConfig) > val log = new Log(dir, > config, > recoveryPoints.getOrElse(topicPartition, 0L), > scheduler, > time) > val previous = addLogWithLock(topicPartition, log) > if(previous != null) > throw new IllegalArgumentException("Duplicate log > directories found: %s, %s!".format(log.dir.getAbsolutePath, > previous.dir.getAbsolutePath)) > } > } > cleanShutDownFile.delete() > } > } > }) > thread.start() > threads(i) = thread > i = i + 1 > } > for(thread <- threads) { > thread.join() > } > } > def addLogWithLock(topicPartition: TopicAndPartition, log: Log): Log = { > logCreationOrDeletionLock synchronized { > this.logs.put(topicPartition, log) > } > } > {code} -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Updated] (KAFKA-1414) Speedup broker startup after hard reset
[ https://issues.apache.org/jira/browse/KAFKA-1414?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Alexey Ozeritskiy updated KAFKA-1414: - Affects Version/s: (was: 0.8.1) 0.9.0 0.8.2 0.8.1.1 Status: Patch Available (was: Open) > Speedup broker startup after hard reset > --- > > Key: KAFKA-1414 > URL: https://issues.apache.org/jira/browse/KAFKA-1414 > Project: Kafka > Issue Type: Improvement > Components: log >Affects Versions: 0.8.1.1, 0.8.2, 0.9.0 >Reporter: Dmitry Bugaychenko >Assignee: Jay Kreps > Attachments: parallel-dir-loading-0.8.patch, > parallel-dir-loading-trunk.patch > > > After hard reset due to power failure broker takes way too much time > recovering unflushed segments in a single thread. This could be easiliy > improved launching multiple threads (one per data dirrectory, assuming that > typically each data directory is on a dedicated drive). Localy we trie this > simple patch to LogManager.loadLogs and it seems to work, however I'm too new > to scala, so do not take it literally: > {code} > /** >* Recover and load all logs in the given data directories >*/ > private def loadLogs(dirs: Seq[File]) { > val threads : Array[Thread] = new Array[Thread](dirs.size) > var i: Int = 0 > val me = this > for(dir <- dirs) { > val thread = new Thread( new Runnable { > def run() > { > val recoveryPoints = me.recoveryPointCheckpoints(dir).read > /* load the logs */ > val subDirs = dir.listFiles() > if(subDirs != null) { > val cleanShutDownFile = new File(dir, Log.CleanShutdownFile) > if(cleanShutDownFile.exists()) > info("Found clean shutdown file. Skipping recovery for all logs > in data directory '%s'".format(dir.getAbsolutePath)) > for(dir <- subDirs) { > if(dir.isDirectory) { > info("Loading log '" + dir.getName + "'") > val topicPartition = Log.parseTopicPartitionName(dir.getName) > val config = topicConfigs.getOrElse(topicPartition.topic, > defaultConfig) > val log = new Log(dir, > config, > recoveryPoints.getOrElse(topicPartition, 0L), > scheduler, > time) > val previous = addLogWithLock(topicPartition, log) > if(previous != null) > throw new IllegalArgumentException("Duplicate log > directories found: %s, %s!".format(log.dir.getAbsolutePath, > previous.dir.getAbsolutePath)) > } > } > cleanShutDownFile.delete() > } > } > }) > thread.start() > threads(i) = thread > i = i + 1 > } > for(thread <- threads) { > thread.join() > } > } > def addLogWithLock(topicPartition: TopicAndPartition, log: Log): Log = { > logCreationOrDeletionLock synchronized { > this.logs.put(topicPartition, log) > } > } > {code} -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Updated] (KAFKA-1414) Speedup broker startup after hard reset
[ https://issues.apache.org/jira/browse/KAFKA-1414?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Alexey Ozeritskiy updated KAFKA-1414: - Attachment: parallel-dir-loading-trunk-threadpool.patch > Speedup broker startup after hard reset > --- > > Key: KAFKA-1414 > URL: https://issues.apache.org/jira/browse/KAFKA-1414 > Project: Kafka > Issue Type: Improvement > Components: log >Affects Versions: 0.8.2, 0.9.0, 0.8.1.1 >Reporter: Dmitry Bugaychenko >Assignee: Jay Kreps > Attachments: parallel-dir-loading-0.8.patch, > parallel-dir-loading-trunk-threadpool.patch, parallel-dir-loading-trunk.patch > > > After hard reset due to power failure broker takes way too much time > recovering unflushed segments in a single thread. This could be easiliy > improved launching multiple threads (one per data dirrectory, assuming that > typically each data directory is on a dedicated drive). Localy we trie this > simple patch to LogManager.loadLogs and it seems to work, however I'm too new > to scala, so do not take it literally: > {code} > /** >* Recover and load all logs in the given data directories >*/ > private def loadLogs(dirs: Seq[File]) { > val threads : Array[Thread] = new Array[Thread](dirs.size) > var i: Int = 0 > val me = this > for(dir <- dirs) { > val thread = new Thread( new Runnable { > def run() > { > val recoveryPoints = me.recoveryPointCheckpoints(dir).read > /* load the logs */ > val subDirs = dir.listFiles() > if(subDirs != null) { > val cleanShutDownFile = new File(dir, Log.CleanShutdownFile) > if(cleanShutDownFile.exists()) > info("Found clean shutdown file. Skipping recovery for all logs > in data directory '%s'".format(dir.getAbsolutePath)) > for(dir <- subDirs) { > if(dir.isDirectory) { > info("Loading log '" + dir.getName + "'") > val topicPartition = Log.parseTopicPartitionName(dir.getName) > val config = topicConfigs.getOrElse(topicPartition.topic, > defaultConfig) > val log = new Log(dir, > config, > recoveryPoints.getOrElse(topicPartition, 0L), > scheduler, > time) > val previous = addLogWithLock(topicPartition, log) > if(previous != null) > throw new IllegalArgumentException("Duplicate log > directories found: %s, %s!".format(log.dir.getAbsolutePath, > previous.dir.getAbsolutePath)) > } > } > cleanShutDownFile.delete() > } > } > }) > thread.start() > threads(i) = thread > i = i + 1 > } > for(thread <- threads) { > thread.join() > } > } > def addLogWithLock(topicPartition: TopicAndPartition, log: Log): Log = { > logCreationOrDeletionLock synchronized { > this.logs.put(topicPartition, log) > } > } > {code} -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Updated] (KAFKA-1414) Speedup broker startup after hard reset
[ https://issues.apache.org/jira/browse/KAFKA-1414?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Anton Karamanov updated KAFKA-1414: --- Attachment: parallel-dir-loading-trunk-fixed-threadpool.patch Hi. I've checked that {{.getCause()}} returns original exception, added handling of {{ExecutionException}} and replaced scheduled thread pool with fixed one. I've also added {{log.recovery.threads}} property to config. All tests are passing. > Speedup broker startup after hard reset > --- > > Key: KAFKA-1414 > URL: https://issues.apache.org/jira/browse/KAFKA-1414 > Project: Kafka > Issue Type: Improvement > Components: log >Affects Versions: 0.8.2, 0.9.0, 0.8.1.1 >Reporter: Dmitry Bugaychenko >Assignee: Jay Kreps > Attachments: parallel-dir-loading-0.8.patch, > parallel-dir-loading-trunk-fixed-threadpool.patch, > parallel-dir-loading-trunk-threadpool.patch, parallel-dir-loading-trunk.patch > > > After hard reset due to power failure broker takes way too much time > recovering unflushed segments in a single thread. This could be easiliy > improved launching multiple threads (one per data dirrectory, assuming that > typically each data directory is on a dedicated drive). Localy we trie this > simple patch to LogManager.loadLogs and it seems to work, however I'm too new > to scala, so do not take it literally: > {code} > /** >* Recover and load all logs in the given data directories >*/ > private def loadLogs(dirs: Seq[File]) { > val threads : Array[Thread] = new Array[Thread](dirs.size) > var i: Int = 0 > val me = this > for(dir <- dirs) { > val thread = new Thread( new Runnable { > def run() > { > val recoveryPoints = me.recoveryPointCheckpoints(dir).read > /* load the logs */ > val subDirs = dir.listFiles() > if(subDirs != null) { > val cleanShutDownFile = new File(dir, Log.CleanShutdownFile) > if(cleanShutDownFile.exists()) > info("Found clean shutdown file. Skipping recovery for all logs > in data directory '%s'".format(dir.getAbsolutePath)) > for(dir <- subDirs) { > if(dir.isDirectory) { > info("Loading log '" + dir.getName + "'") > val topicPartition = Log.parseTopicPartitionName(dir.getName) > val config = topicConfigs.getOrElse(topicPartition.topic, > defaultConfig) > val log = new Log(dir, > config, > recoveryPoints.getOrElse(topicPartition, 0L), > scheduler, > time) > val previous = addLogWithLock(topicPartition, log) > if(previous != null) > throw new IllegalArgumentException("Duplicate log > directories found: %s, %s!".format(log.dir.getAbsolutePath, > previous.dir.getAbsolutePath)) > } > } > cleanShutDownFile.delete() > } > } > }) > thread.start() > threads(i) = thread > i = i + 1 > } > for(thread <- threads) { > thread.join() > } > } > def addLogWithLock(topicPartition: TopicAndPartition, log: Log): Log = { > logCreationOrDeletionLock synchronized { > this.logs.put(topicPartition, log) > } > } > {code} -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Updated] (KAFKA-1414) Speedup broker startup after hard reset
[ https://issues.apache.org/jira/browse/KAFKA-1414?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Anton Karamanov updated KAFKA-1414: --- Attachment: 0001-KAFKA-1414-Speedup-broker-startup-after-hard-reset-a.patch Sure. [^0001-KAFKA-1414-Speedup-broker-startup-after-hard-reset-a.patch] > Speedup broker startup after hard reset > --- > > Key: KAFKA-1414 > URL: https://issues.apache.org/jira/browse/KAFKA-1414 > Project: Kafka > Issue Type: Improvement > Components: log >Affects Versions: 0.8.2, 0.9.0, 0.8.1.1 >Reporter: Dmitry Bugaychenko >Assignee: Jay Kreps > Attachments: > 0001-KAFKA-1414-Speedup-broker-startup-after-hard-reset-a.patch, > parallel-dir-loading-0.8.patch, > parallel-dir-loading-trunk-fixed-threadpool.patch, > parallel-dir-loading-trunk-threadpool.patch, parallel-dir-loading-trunk.patch > > > After hard reset due to power failure broker takes way too much time > recovering unflushed segments in a single thread. This could be easiliy > improved launching multiple threads (one per data dirrectory, assuming that > typically each data directory is on a dedicated drive). Localy we trie this > simple patch to LogManager.loadLogs and it seems to work, however I'm too new > to scala, so do not take it literally: > {code} > /** >* Recover and load all logs in the given data directories >*/ > private def loadLogs(dirs: Seq[File]) { > val threads : Array[Thread] = new Array[Thread](dirs.size) > var i: Int = 0 > val me = this > for(dir <- dirs) { > val thread = new Thread( new Runnable { > def run() > { > val recoveryPoints = me.recoveryPointCheckpoints(dir).read > /* load the logs */ > val subDirs = dir.listFiles() > if(subDirs != null) { > val cleanShutDownFile = new File(dir, Log.CleanShutdownFile) > if(cleanShutDownFile.exists()) > info("Found clean shutdown file. Skipping recovery for all logs > in data directory '%s'".format(dir.getAbsolutePath)) > for(dir <- subDirs) { > if(dir.isDirectory) { > info("Loading log '" + dir.getName + "'") > val topicPartition = Log.parseTopicPartitionName(dir.getName) > val config = topicConfigs.getOrElse(topicPartition.topic, > defaultConfig) > val log = new Log(dir, > config, > recoveryPoints.getOrElse(topicPartition, 0L), > scheduler, > time) > val previous = addLogWithLock(topicPartition, log) > if(previous != null) > throw new IllegalArgumentException("Duplicate log > directories found: %s, %s!".format(log.dir.getAbsolutePath, > previous.dir.getAbsolutePath)) > } > } > cleanShutDownFile.delete() > } > } > }) > thread.start() > threads(i) = thread > i = i + 1 > } > for(thread <- threads) { > thread.join() > } > } > def addLogWithLock(topicPartition: TopicAndPartition, log: Log): Log = { > logCreationOrDeletionLock synchronized { > this.logs.put(topicPartition, log) > } > } > {code} -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Updated] (KAFKA-1414) Speedup broker startup after hard reset
[ https://issues.apache.org/jira/browse/KAFKA-1414?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Anton Karamanov updated KAFKA-1414: --- Attachment: KAFKA-1414-rev1.patch I've updated the patch (and corresponding GitHub issue): * revert {{addLogWithLock()}} change; * pass properties directly to {{LogManager}}; * parallelization during shutdown is now done on per log basis; * move logging during shutdown and recovery to debug level; * rebase trunk. [^KAFKA-1414-rev1.patch] > Speedup broker startup after hard reset > --- > > Key: KAFKA-1414 > URL: https://issues.apache.org/jira/browse/KAFKA-1414 > Project: Kafka > Issue Type: Improvement > Components: log >Affects Versions: 0.8.2, 0.9.0, 0.8.1.1 >Reporter: Dmitry Bugaychenko >Assignee: Jay Kreps > Attachments: > 0001-KAFKA-1414-Speedup-broker-startup-after-hard-reset-a.patch, > KAFKA-1414-rev1.patch, parallel-dir-loading-0.8.patch, > parallel-dir-loading-trunk-fixed-threadpool.patch, > parallel-dir-loading-trunk-threadpool.patch, parallel-dir-loading-trunk.patch > > > After hard reset due to power failure broker takes way too much time > recovering unflushed segments in a single thread. This could be easiliy > improved launching multiple threads (one per data dirrectory, assuming that > typically each data directory is on a dedicated drive). Localy we trie this > simple patch to LogManager.loadLogs and it seems to work, however I'm too new > to scala, so do not take it literally: > {code} > /** >* Recover and load all logs in the given data directories >*/ > private def loadLogs(dirs: Seq[File]) { > val threads : Array[Thread] = new Array[Thread](dirs.size) > var i: Int = 0 > val me = this > for(dir <- dirs) { > val thread = new Thread( new Runnable { > def run() > { > val recoveryPoints = me.recoveryPointCheckpoints(dir).read > /* load the logs */ > val subDirs = dir.listFiles() > if(subDirs != null) { > val cleanShutDownFile = new File(dir, Log.CleanShutdownFile) > if(cleanShutDownFile.exists()) > info("Found clean shutdown file. Skipping recovery for all logs > in data directory '%s'".format(dir.getAbsolutePath)) > for(dir <- subDirs) { > if(dir.isDirectory) { > info("Loading log '" + dir.getName + "'") > val topicPartition = Log.parseTopicPartitionName(dir.getName) > val config = topicConfigs.getOrElse(topicPartition.topic, > defaultConfig) > val log = new Log(dir, > config, > recoveryPoints.getOrElse(topicPartition, 0L), > scheduler, > time) > val previous = addLogWithLock(topicPartition, log) > if(previous != null) > throw new IllegalArgumentException("Duplicate log > directories found: %s, %s!".format(log.dir.getAbsolutePath, > previous.dir.getAbsolutePath)) > } > } > cleanShutDownFile.delete() > } > } > }) > thread.start() > threads(i) = thread > i = i + 1 > } > for(thread <- threads) { > thread.join() > } > } > def addLogWithLock(topicPartition: TopicAndPartition, log: Log): Log = { > logCreationOrDeletionLock synchronized { > this.logs.put(topicPartition, log) > } > } > {code} -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Updated] (KAFKA-1414) Speedup broker startup after hard reset
[ https://issues.apache.org/jira/browse/KAFKA-1414?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Anton Karamanov updated KAFKA-1414: --- Attachment: KAFKA-1414-rev2.patch Thanks for the review. Here's updated [^KAFKA-1414-rev2.patch|patch]. 30. Fixed. 31. Done. I've checked that tests are passing before and after changes to ensure I don't break anything. Note that copy-pasted instances of LogManager seemed to have arbitrary selected timing properties (such as flushCheckpointMs) so I've replaced them with the most common one. It seems to work fine. 32. LogManager 32.1 Done. Both shutdown and recovery parallelization is now done in the same manner at the log level. 33. Done. All jobs are first submitted to pool and then are awaited for directory by directory, to be able to perform necessary clean up afterwards. 34. Done. New property is called `log.io.parallelizm`. > Speedup broker startup after hard reset > --- > > Key: KAFKA-1414 > URL: https://issues.apache.org/jira/browse/KAFKA-1414 > Project: Kafka > Issue Type: Improvement > Components: log >Affects Versions: 0.8.2, 0.9.0, 0.8.1.1 >Reporter: Dmitry Bugaychenko >Assignee: Jay Kreps > Attachments: > 0001-KAFKA-1414-Speedup-broker-startup-after-hard-reset-a.patch, > KAFKA-1414-rev1.patch, KAFKA-1414-rev2.patch, parallel-dir-loading-0.8.patch, > parallel-dir-loading-trunk-fixed-threadpool.patch, > parallel-dir-loading-trunk-threadpool.patch, parallel-dir-loading-trunk.patch > > > After hard reset due to power failure broker takes way too much time > recovering unflushed segments in a single thread. This could be easiliy > improved launching multiple threads (one per data dirrectory, assuming that > typically each data directory is on a dedicated drive). Localy we trie this > simple patch to LogManager.loadLogs and it seems to work, however I'm too new > to scala, so do not take it literally: > {code} > /** >* Recover and load all logs in the given data directories >*/ > private def loadLogs(dirs: Seq[File]) { > val threads : Array[Thread] = new Array[Thread](dirs.size) > var i: Int = 0 > val me = this > for(dir <- dirs) { > val thread = new Thread( new Runnable { > def run() > { > val recoveryPoints = me.recoveryPointCheckpoints(dir).read > /* load the logs */ > val subDirs = dir.listFiles() > if(subDirs != null) { > val cleanShutDownFile = new File(dir, Log.CleanShutdownFile) > if(cleanShutDownFile.exists()) > info("Found clean shutdown file. Skipping recovery for all logs > in data directory '%s'".format(dir.getAbsolutePath)) > for(dir <- subDirs) { > if(dir.isDirectory) { > info("Loading log '" + dir.getName + "'") > val topicPartition = Log.parseTopicPartitionName(dir.getName) > val config = topicConfigs.getOrElse(topicPartition.topic, > defaultConfig) > val log = new Log(dir, > config, > recoveryPoints.getOrElse(topicPartition, 0L), > scheduler, > time) > val previous = addLogWithLock(topicPartition, log) > if(previous != null) > throw new IllegalArgumentException("Duplicate log > directories found: %s, %s!".format(log.dir.getAbsolutePath, > previous.dir.getAbsolutePath)) > } > } > cleanShutDownFile.delete() > } > } > }) > thread.start() > threads(i) = thread > i = i + 1 > } > for(thread <- threads) { > thread.join() > } > } > def addLogWithLock(topicPartition: TopicAndPartition, log: Log): Log = { > logCreationOrDeletionLock synchronized { > this.logs.put(topicPartition, log) > } > } > {code} -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Updated] (KAFKA-1414) Speedup broker startup after hard reset
[ https://issues.apache.org/jira/browse/KAFKA-1414?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Anton Karamanov updated KAFKA-1414: --- Attachment: KAFKA-1414-rev2.fixed.patch Small fix: directory unlocking should have been performed only when all jobs are done. [Fixed patch|^KAFKA-1414-rev2.fixed.patch] GitHub issue is also updated. > Speedup broker startup after hard reset > --- > > Key: KAFKA-1414 > URL: https://issues.apache.org/jira/browse/KAFKA-1414 > Project: Kafka > Issue Type: Improvement > Components: log >Affects Versions: 0.8.2, 0.9.0, 0.8.1.1 >Reporter: Dmitry Bugaychenko >Assignee: Jay Kreps > Attachments: > 0001-KAFKA-1414-Speedup-broker-startup-after-hard-reset-a.patch, > KAFKA-1414-rev1.patch, KAFKA-1414-rev2.fixed.patch, KAFKA-1414-rev2.patch, > parallel-dir-loading-0.8.patch, > parallel-dir-loading-trunk-fixed-threadpool.patch, > parallel-dir-loading-trunk-threadpool.patch, parallel-dir-loading-trunk.patch > > > After hard reset due to power failure broker takes way too much time > recovering unflushed segments in a single thread. This could be easiliy > improved launching multiple threads (one per data dirrectory, assuming that > typically each data directory is on a dedicated drive). Localy we trie this > simple patch to LogManager.loadLogs and it seems to work, however I'm too new > to scala, so do not take it literally: > {code} > /** >* Recover and load all logs in the given data directories >*/ > private def loadLogs(dirs: Seq[File]) { > val threads : Array[Thread] = new Array[Thread](dirs.size) > var i: Int = 0 > val me = this > for(dir <- dirs) { > val thread = new Thread( new Runnable { > def run() > { > val recoveryPoints = me.recoveryPointCheckpoints(dir).read > /* load the logs */ > val subDirs = dir.listFiles() > if(subDirs != null) { > val cleanShutDownFile = new File(dir, Log.CleanShutdownFile) > if(cleanShutDownFile.exists()) > info("Found clean shutdown file. Skipping recovery for all logs > in data directory '%s'".format(dir.getAbsolutePath)) > for(dir <- subDirs) { > if(dir.isDirectory) { > info("Loading log '" + dir.getName + "'") > val topicPartition = Log.parseTopicPartitionName(dir.getName) > val config = topicConfigs.getOrElse(topicPartition.topic, > defaultConfig) > val log = new Log(dir, > config, > recoveryPoints.getOrElse(topicPartition, 0L), > scheduler, > time) > val previous = addLogWithLock(topicPartition, log) > if(previous != null) > throw new IllegalArgumentException("Duplicate log > directories found: %s, %s!".format(log.dir.getAbsolutePath, > previous.dir.getAbsolutePath)) > } > } > cleanShutDownFile.delete() > } > } > }) > thread.start() > threads(i) = thread > i = i + 1 > } > for(thread <- threads) { > thread.join() > } > } > def addLogWithLock(topicPartition: TopicAndPartition, log: Log): Log = { > logCreationOrDeletionLock synchronized { > this.logs.put(topicPartition, log) > } > } > {code} -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Updated] (KAFKA-1414) Speedup broker startup after hard reset
[ https://issues.apache.org/jira/browse/KAFKA-1414?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Anton Karamanov updated KAFKA-1414: --- Attachment: (was: KAFKA-1414-rev2.fixed.patch) > Speedup broker startup after hard reset > --- > > Key: KAFKA-1414 > URL: https://issues.apache.org/jira/browse/KAFKA-1414 > Project: Kafka > Issue Type: Improvement > Components: log >Affects Versions: 0.8.2, 0.9.0, 0.8.1.1 >Reporter: Dmitry Bugaychenko >Assignee: Jay Kreps > Attachments: > 0001-KAFKA-1414-Speedup-broker-startup-after-hard-reset-a.patch, > KAFKA-1414-rev1.patch, KAFKA-1414-rev2.patch, parallel-dir-loading-0.8.patch, > parallel-dir-loading-trunk-fixed-threadpool.patch, > parallel-dir-loading-trunk-threadpool.patch, parallel-dir-loading-trunk.patch > > > After hard reset due to power failure broker takes way too much time > recovering unflushed segments in a single thread. This could be easiliy > improved launching multiple threads (one per data dirrectory, assuming that > typically each data directory is on a dedicated drive). Localy we trie this > simple patch to LogManager.loadLogs and it seems to work, however I'm too new > to scala, so do not take it literally: > {code} > /** >* Recover and load all logs in the given data directories >*/ > private def loadLogs(dirs: Seq[File]) { > val threads : Array[Thread] = new Array[Thread](dirs.size) > var i: Int = 0 > val me = this > for(dir <- dirs) { > val thread = new Thread( new Runnable { > def run() > { > val recoveryPoints = me.recoveryPointCheckpoints(dir).read > /* load the logs */ > val subDirs = dir.listFiles() > if(subDirs != null) { > val cleanShutDownFile = new File(dir, Log.CleanShutdownFile) > if(cleanShutDownFile.exists()) > info("Found clean shutdown file. Skipping recovery for all logs > in data directory '%s'".format(dir.getAbsolutePath)) > for(dir <- subDirs) { > if(dir.isDirectory) { > info("Loading log '" + dir.getName + "'") > val topicPartition = Log.parseTopicPartitionName(dir.getName) > val config = topicConfigs.getOrElse(topicPartition.topic, > defaultConfig) > val log = new Log(dir, > config, > recoveryPoints.getOrElse(topicPartition, 0L), > scheduler, > time) > val previous = addLogWithLock(topicPartition, log) > if(previous != null) > throw new IllegalArgumentException("Duplicate log > directories found: %s, %s!".format(log.dir.getAbsolutePath, > previous.dir.getAbsolutePath)) > } > } > cleanShutDownFile.delete() > } > } > }) > thread.start() > threads(i) = thread > i = i + 1 > } > for(thread <- threads) { > thread.join() > } > } > def addLogWithLock(topicPartition: TopicAndPartition, log: Log): Log = { > logCreationOrDeletionLock synchronized { > this.logs.put(topicPartition, log) > } > } > {code} -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Updated] (KAFKA-1414) Speedup broker startup after hard reset
[ https://issues.apache.org/jira/browse/KAFKA-1414?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Anton Karamanov updated KAFKA-1414: --- Attachment: KAFKA-1414-rev2.fixed.patch > Speedup broker startup after hard reset > --- > > Key: KAFKA-1414 > URL: https://issues.apache.org/jira/browse/KAFKA-1414 > Project: Kafka > Issue Type: Improvement > Components: log >Affects Versions: 0.8.2, 0.9.0, 0.8.1.1 >Reporter: Dmitry Bugaychenko >Assignee: Jay Kreps > Attachments: > 0001-KAFKA-1414-Speedup-broker-startup-after-hard-reset-a.patch, > KAFKA-1414-rev1.patch, KAFKA-1414-rev2.fixed.patch, KAFKA-1414-rev2.patch, > parallel-dir-loading-0.8.patch, > parallel-dir-loading-trunk-fixed-threadpool.patch, > parallel-dir-loading-trunk-threadpool.patch, parallel-dir-loading-trunk.patch > > > After hard reset due to power failure broker takes way too much time > recovering unflushed segments in a single thread. This could be easiliy > improved launching multiple threads (one per data dirrectory, assuming that > typically each data directory is on a dedicated drive). Localy we trie this > simple patch to LogManager.loadLogs and it seems to work, however I'm too new > to scala, so do not take it literally: > {code} > /** >* Recover and load all logs in the given data directories >*/ > private def loadLogs(dirs: Seq[File]) { > val threads : Array[Thread] = new Array[Thread](dirs.size) > var i: Int = 0 > val me = this > for(dir <- dirs) { > val thread = new Thread( new Runnable { > def run() > { > val recoveryPoints = me.recoveryPointCheckpoints(dir).read > /* load the logs */ > val subDirs = dir.listFiles() > if(subDirs != null) { > val cleanShutDownFile = new File(dir, Log.CleanShutdownFile) > if(cleanShutDownFile.exists()) > info("Found clean shutdown file. Skipping recovery for all logs > in data directory '%s'".format(dir.getAbsolutePath)) > for(dir <- subDirs) { > if(dir.isDirectory) { > info("Loading log '" + dir.getName + "'") > val topicPartition = Log.parseTopicPartitionName(dir.getName) > val config = topicConfigs.getOrElse(topicPartition.topic, > defaultConfig) > val log = new Log(dir, > config, > recoveryPoints.getOrElse(topicPartition, 0L), > scheduler, > time) > val previous = addLogWithLock(topicPartition, log) > if(previous != null) > throw new IllegalArgumentException("Duplicate log > directories found: %s, %s!".format(log.dir.getAbsolutePath, > previous.dir.getAbsolutePath)) > } > } > cleanShutDownFile.delete() > } > } > }) > thread.start() > threads(i) = thread > i = i + 1 > } > for(thread <- threads) { > thread.join() > } > } > def addLogWithLock(topicPartition: TopicAndPartition, log: Log): Log = { > logCreationOrDeletionLock synchronized { > this.logs.put(topicPartition, log) > } > } > {code} -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Updated] (KAFKA-1414) Speedup broker startup after hard reset
[ https://issues.apache.org/jira/browse/KAFKA-1414?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Anton Karamanov updated KAFKA-1414: --- Attachment: freebie.patch I would also like to throw in that little [patch|^freebie.patch]. It (arguably) improves and (definitely) simplifies the use of {{Utils.runnable}} helper, which used in my patch and a couple of other places. By changing it's signature we would be able to use it as: {code} Utils.runnable { // do something } {code} instead of: {code} Utils.runnable { () => { // do something }} {code} which will clarify the syntax a bit making it more compelling to use. The patch can be applied directly on top of the primary patch for the task. > Speedup broker startup after hard reset > --- > > Key: KAFKA-1414 > URL: https://issues.apache.org/jira/browse/KAFKA-1414 > Project: Kafka > Issue Type: Improvement > Components: log >Affects Versions: 0.8.2, 0.9.0, 0.8.1.1 >Reporter: Dmitry Bugaychenko >Assignee: Jay Kreps > Attachments: > 0001-KAFKA-1414-Speedup-broker-startup-after-hard-reset-a.patch, > KAFKA-1414-rev1.patch, KAFKA-1414-rev2.fixed.patch, KAFKA-1414-rev2.patch, > freebie.patch, parallel-dir-loading-0.8.patch, > parallel-dir-loading-trunk-fixed-threadpool.patch, > parallel-dir-loading-trunk-threadpool.patch, parallel-dir-loading-trunk.patch > > > After hard reset due to power failure broker takes way too much time > recovering unflushed segments in a single thread. This could be easiliy > improved launching multiple threads (one per data dirrectory, assuming that > typically each data directory is on a dedicated drive). Localy we trie this > simple patch to LogManager.loadLogs and it seems to work, however I'm too new > to scala, so do not take it literally: > {code} > /** >* Recover and load all logs in the given data directories >*/ > private def loadLogs(dirs: Seq[File]) { > val threads : Array[Thread] = new Array[Thread](dirs.size) > var i: Int = 0 > val me = this > for(dir <- dirs) { > val thread = new Thread( new Runnable { > def run() > { > val recoveryPoints = me.recoveryPointCheckpoints(dir).read > /* load the logs */ > val subDirs = dir.listFiles() > if(subDirs != null) { > val cleanShutDownFile = new File(dir, Log.CleanShutdownFile) > if(cleanShutDownFile.exists()) > info("Found clean shutdown file. Skipping recovery for all logs > in data directory '%s'".format(dir.getAbsolutePath)) > for(dir <- subDirs) { > if(dir.isDirectory) { > info("Loading log '" + dir.getName + "'") > val topicPartition = Log.parseTopicPartitionName(dir.getName) > val config = topicConfigs.getOrElse(topicPartition.topic, > defaultConfig) > val log = new Log(dir, > config, > recoveryPoints.getOrElse(topicPartition, 0L), > scheduler, > time) > val previous = addLogWithLock(topicPartition, log) > if(previous != null) > throw new IllegalArgumentException("Duplicate log > directories found: %s, %s!".format(log.dir.getAbsolutePath, > previous.dir.getAbsolutePath)) > } > } > cleanShutDownFile.delete() > } > } > }) > thread.start() > threads(i) = thread > i = i + 1 > } > for(thread <- threads) { > thread.join() > } > } > def addLogWithLock(topicPartition: TopicAndPartition, log: Log): Log = { > logCreationOrDeletionLock synchronized { > this.logs.put(topicPartition, log) > } > } > {code} -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Updated] (KAFKA-1414) Speedup broker startup after hard reset
[ https://issues.apache.org/jira/browse/KAFKA-1414?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Anton Karamanov updated KAFKA-1414: --- Attachment: KAFKA-1414-rev3.patch OK. Here's a new patch. Everything mentioned in the review is fixed. {{Utils.runnable}} patch is included in that one as well. [^KAFKA-1414-rev3.patch] > Speedup broker startup after hard reset > --- > > Key: KAFKA-1414 > URL: https://issues.apache.org/jira/browse/KAFKA-1414 > Project: Kafka > Issue Type: Improvement > Components: log >Affects Versions: 0.8.2, 0.9.0, 0.8.1.1 >Reporter: Dmitry Bugaychenko >Assignee: Jay Kreps > Attachments: > 0001-KAFKA-1414-Speedup-broker-startup-after-hard-reset-a.patch, > KAFKA-1414-rev1.patch, KAFKA-1414-rev2.fixed.patch, KAFKA-1414-rev2.patch, > KAFKA-1414-rev3.patch, freebie.patch, parallel-dir-loading-0.8.patch, > parallel-dir-loading-trunk-fixed-threadpool.patch, > parallel-dir-loading-trunk-threadpool.patch, parallel-dir-loading-trunk.patch > > > After hard reset due to power failure broker takes way too much time > recovering unflushed segments in a single thread. This could be easiliy > improved launching multiple threads (one per data dirrectory, assuming that > typically each data directory is on a dedicated drive). Localy we trie this > simple patch to LogManager.loadLogs and it seems to work, however I'm too new > to scala, so do not take it literally: > {code} > /** >* Recover and load all logs in the given data directories >*/ > private def loadLogs(dirs: Seq[File]) { > val threads : Array[Thread] = new Array[Thread](dirs.size) > var i: Int = 0 > val me = this > for(dir <- dirs) { > val thread = new Thread( new Runnable { > def run() > { > val recoveryPoints = me.recoveryPointCheckpoints(dir).read > /* load the logs */ > val subDirs = dir.listFiles() > if(subDirs != null) { > val cleanShutDownFile = new File(dir, Log.CleanShutdownFile) > if(cleanShutDownFile.exists()) > info("Found clean shutdown file. Skipping recovery for all logs > in data directory '%s'".format(dir.getAbsolutePath)) > for(dir <- subDirs) { > if(dir.isDirectory) { > info("Loading log '" + dir.getName + "'") > val topicPartition = Log.parseTopicPartitionName(dir.getName) > val config = topicConfigs.getOrElse(topicPartition.topic, > defaultConfig) > val log = new Log(dir, > config, > recoveryPoints.getOrElse(topicPartition, 0L), > scheduler, > time) > val previous = addLogWithLock(topicPartition, log) > if(previous != null) > throw new IllegalArgumentException("Duplicate log > directories found: %s, %s!".format(log.dir.getAbsolutePath, > previous.dir.getAbsolutePath)) > } > } > cleanShutDownFile.delete() > } > } > }) > thread.start() > threads(i) = thread > i = i + 1 > } > for(thread <- threads) { > thread.join() > } > } > def addLogWithLock(topicPartition: TopicAndPartition, log: Log): Log = { > logCreationOrDeletionLock synchronized { > this.logs.put(topicPartition, log) > } > } > {code} -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Updated] (KAFKA-1414) Speedup broker startup after hard reset
[ https://issues.apache.org/jira/browse/KAFKA-1414?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Anton Karamanov updated KAFKA-1414: --- Attachment: KAFKA-1414-rev3-interleaving.patch Log-level parallelization is better in any case, because it's more flexible. As I mentioned, dir-level parallelization can be achieved on top of log-level parallelization by interleaving jobs of every dir. That will make it possible to load logs from all disks at the same time. On the other hand, I second [~jkreps] on testing whether log loading is CPU or IO bound and measuring a speed of start and shutdown with my patch. After all -- it's not the number of disks which are spinning wildly that matters, but the overall time it takes to start/shutdown the system. (Oh, and if we decide to use interleaved jobs strategy -- here's a [patch|^KAFKA-1414-rev3-interleaving.patch] (: ) > Speedup broker startup after hard reset > --- > > Key: KAFKA-1414 > URL: https://issues.apache.org/jira/browse/KAFKA-1414 > Project: Kafka > Issue Type: Improvement > Components: log >Affects Versions: 0.8.2, 0.9.0, 0.8.1.1 >Reporter: Dmitry Bugaychenko >Assignee: Jay Kreps > Attachments: > 0001-KAFKA-1414-Speedup-broker-startup-after-hard-reset-a.patch, > KAFKA-1414-rev1.patch, KAFKA-1414-rev2.fixed.patch, KAFKA-1414-rev2.patch, > KAFKA-1414-rev3-interleaving.patch, KAFKA-1414-rev3.patch, freebie.patch, > parallel-dir-loading-0.8.patch, > parallel-dir-loading-trunk-fixed-threadpool.patch, > parallel-dir-loading-trunk-threadpool.patch, parallel-dir-loading-trunk.patch > > > After hard reset due to power failure broker takes way too much time > recovering unflushed segments in a single thread. This could be easiliy > improved launching multiple threads (one per data dirrectory, assuming that > typically each data directory is on a dedicated drive). Localy we trie this > simple patch to LogManager.loadLogs and it seems to work, however I'm too new > to scala, so do not take it literally: > {code} > /** >* Recover and load all logs in the given data directories >*/ > private def loadLogs(dirs: Seq[File]) { > val threads : Array[Thread] = new Array[Thread](dirs.size) > var i: Int = 0 > val me = this > for(dir <- dirs) { > val thread = new Thread( new Runnable { > def run() > { > val recoveryPoints = me.recoveryPointCheckpoints(dir).read > /* load the logs */ > val subDirs = dir.listFiles() > if(subDirs != null) { > val cleanShutDownFile = new File(dir, Log.CleanShutdownFile) > if(cleanShutDownFile.exists()) > info("Found clean shutdown file. Skipping recovery for all logs > in data directory '%s'".format(dir.getAbsolutePath)) > for(dir <- subDirs) { > if(dir.isDirectory) { > info("Loading log '" + dir.getName + "'") > val topicPartition = Log.parseTopicPartitionName(dir.getName) > val config = topicConfigs.getOrElse(topicPartition.topic, > defaultConfig) > val log = new Log(dir, > config, > recoveryPoints.getOrElse(topicPartition, 0L), > scheduler, > time) > val previous = addLogWithLock(topicPartition, log) > if(previous != null) > throw new IllegalArgumentException("Duplicate log > directories found: %s, %s!".format(log.dir.getAbsolutePath, > previous.dir.getAbsolutePath)) > } > } > cleanShutDownFile.delete() > } > } > }) > thread.start() > threads(i) = thread > i = i + 1 > } > for(thread <- threads) { > thread.join() > } > } > def addLogWithLock(topicPartition: TopicAndPartition, log: Log): Log = { > logCreationOrDeletionLock synchronized { > this.logs.put(topicPartition, log) > } > } > {code} -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Updated] (KAFKA-1414) Speedup broker startup after hard reset
[ https://issues.apache.org/jira/browse/KAFKA-1414?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Joe Stein updated KAFKA-1414: - Fix Version/s: 0.8.2 > Speedup broker startup after hard reset > --- > > Key: KAFKA-1414 > URL: https://issues.apache.org/jira/browse/KAFKA-1414 > Project: Kafka > Issue Type: Improvement > Components: log >Affects Versions: 0.8.1.1, 0.8.2, 0.9.0 >Reporter: Dmitry Bugaychenko >Assignee: Jay Kreps > Fix For: 0.8.2 > > Attachments: > 0001-KAFKA-1414-Speedup-broker-startup-after-hard-reset-a.patch, > KAFKA-1414-rev1.patch, KAFKA-1414-rev2.fixed.patch, KAFKA-1414-rev2.patch, > KAFKA-1414-rev3-interleaving.patch, KAFKA-1414-rev3.patch, freebie.patch, > parallel-dir-loading-0.8.patch, > parallel-dir-loading-trunk-fixed-threadpool.patch, > parallel-dir-loading-trunk-threadpool.patch, parallel-dir-loading-trunk.patch > > > After hard reset due to power failure broker takes way too much time > recovering unflushed segments in a single thread. This could be easiliy > improved launching multiple threads (one per data dirrectory, assuming that > typically each data directory is on a dedicated drive). Localy we trie this > simple patch to LogManager.loadLogs and it seems to work, however I'm too new > to scala, so do not take it literally: > {code} > /** >* Recover and load all logs in the given data directories >*/ > private def loadLogs(dirs: Seq[File]) { > val threads : Array[Thread] = new Array[Thread](dirs.size) > var i: Int = 0 > val me = this > for(dir <- dirs) { > val thread = new Thread( new Runnable { > def run() > { > val recoveryPoints = me.recoveryPointCheckpoints(dir).read > /* load the logs */ > val subDirs = dir.listFiles() > if(subDirs != null) { > val cleanShutDownFile = new File(dir, Log.CleanShutdownFile) > if(cleanShutDownFile.exists()) > info("Found clean shutdown file. Skipping recovery for all logs > in data directory '%s'".format(dir.getAbsolutePath)) > for(dir <- subDirs) { > if(dir.isDirectory) { > info("Loading log '" + dir.getName + "'") > val topicPartition = Log.parseTopicPartitionName(dir.getName) > val config = topicConfigs.getOrElse(topicPartition.topic, > defaultConfig) > val log = new Log(dir, > config, > recoveryPoints.getOrElse(topicPartition, 0L), > scheduler, > time) > val previous = addLogWithLock(topicPartition, log) > if(previous != null) > throw new IllegalArgumentException("Duplicate log > directories found: %s, %s!".format(log.dir.getAbsolutePath, > previous.dir.getAbsolutePath)) > } > } > cleanShutDownFile.delete() > } > } > }) > thread.start() > threads(i) = thread > i = i + 1 > } > for(thread <- threads) { > thread.join() > } > } > def addLogWithLock(topicPartition: TopicAndPartition, log: Log): Log = { > logCreationOrDeletionLock synchronized { > this.logs.put(topicPartition, log) > } > } > {code} -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Updated] (KAFKA-1414) Speedup broker startup after hard reset
[ https://issues.apache.org/jira/browse/KAFKA-1414?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Anton Karamanov updated KAFKA-1414: --- Attachment: KAFKA-1414-rev4.patch I've updated the patch. * replaced {{log.io.parallelism}} with {{log.threads.per.data.dir}}; * added separate thread pools for each directory to simplify thread control; * rebased. [Patch v4|^KAFKA-1414-rev4.patch] > Speedup broker startup after hard reset > --- > > Key: KAFKA-1414 > URL: https://issues.apache.org/jira/browse/KAFKA-1414 > Project: Kafka > Issue Type: Improvement > Components: log >Affects Versions: 0.8.1.1, 0.8.2, 0.9.0 >Reporter: Dmitry Bugaychenko >Assignee: Jay Kreps > Fix For: 0.8.2 > > Attachments: > 0001-KAFKA-1414-Speedup-broker-startup-after-hard-reset-a.patch, > KAFKA-1414-rev1.patch, KAFKA-1414-rev2.fixed.patch, KAFKA-1414-rev2.patch, > KAFKA-1414-rev3-interleaving.patch, KAFKA-1414-rev3.patch, > KAFKA-1414-rev4.patch, freebie.patch, parallel-dir-loading-0.8.patch, > parallel-dir-loading-trunk-fixed-threadpool.patch, > parallel-dir-loading-trunk-threadpool.patch, parallel-dir-loading-trunk.patch > > > After hard reset due to power failure broker takes way too much time > recovering unflushed segments in a single thread. This could be easiliy > improved launching multiple threads (one per data dirrectory, assuming that > typically each data directory is on a dedicated drive). Localy we trie this > simple patch to LogManager.loadLogs and it seems to work, however I'm too new > to scala, so do not take it literally: > {code} > /** >* Recover and load all logs in the given data directories >*/ > private def loadLogs(dirs: Seq[File]) { > val threads : Array[Thread] = new Array[Thread](dirs.size) > var i: Int = 0 > val me = this > for(dir <- dirs) { > val thread = new Thread( new Runnable { > def run() > { > val recoveryPoints = me.recoveryPointCheckpoints(dir).read > /* load the logs */ > val subDirs = dir.listFiles() > if(subDirs != null) { > val cleanShutDownFile = new File(dir, Log.CleanShutdownFile) > if(cleanShutDownFile.exists()) > info("Found clean shutdown file. Skipping recovery for all logs > in data directory '%s'".format(dir.getAbsolutePath)) > for(dir <- subDirs) { > if(dir.isDirectory) { > info("Loading log '" + dir.getName + "'") > val topicPartition = Log.parseTopicPartitionName(dir.getName) > val config = topicConfigs.getOrElse(topicPartition.topic, > defaultConfig) > val log = new Log(dir, > config, > recoveryPoints.getOrElse(topicPartition, 0L), > scheduler, > time) > val previous = addLogWithLock(topicPartition, log) > if(previous != null) > throw new IllegalArgumentException("Duplicate log > directories found: %s, %s!".format(log.dir.getAbsolutePath, > previous.dir.getAbsolutePath)) > } > } > cleanShutDownFile.delete() > } > } > }) > thread.start() > threads(i) = thread > i = i + 1 > } > for(thread <- threads) { > thread.join() > } > } > def addLogWithLock(topicPartition: TopicAndPartition, log: Log): Log = { > logCreationOrDeletionLock synchronized { > this.logs.put(topicPartition, log) > } > } > {code} -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Updated] (KAFKA-1414) Speedup broker startup after hard reset
[ https://issues.apache.org/jira/browse/KAFKA-1414?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Anton Karamanov updated KAFKA-1414: --- Attachment: KAFKA-1414-rev5.patch 50. Used original approach, as seen before patching. Fixed everything else. [Updated patch|^KAFKA-1414-rev5.patch] > Speedup broker startup after hard reset > --- > > Key: KAFKA-1414 > URL: https://issues.apache.org/jira/browse/KAFKA-1414 > Project: Kafka > Issue Type: Improvement > Components: log >Affects Versions: 0.8.1.1, 0.8.2, 0.9.0 >Reporter: Dmitry Bugaychenko >Assignee: Jay Kreps > Fix For: 0.8.2 > > Attachments: > 0001-KAFKA-1414-Speedup-broker-startup-after-hard-reset-a.patch, > KAFKA-1414-rev1.patch, KAFKA-1414-rev2.fixed.patch, KAFKA-1414-rev2.patch, > KAFKA-1414-rev3-interleaving.patch, KAFKA-1414-rev3.patch, > KAFKA-1414-rev4.patch, KAFKA-1414-rev5.patch, freebie.patch, > parallel-dir-loading-0.8.patch, > parallel-dir-loading-trunk-fixed-threadpool.patch, > parallel-dir-loading-trunk-threadpool.patch, parallel-dir-loading-trunk.patch > > > After hard reset due to power failure broker takes way too much time > recovering unflushed segments in a single thread. This could be easiliy > improved launching multiple threads (one per data dirrectory, assuming that > typically each data directory is on a dedicated drive). Localy we trie this > simple patch to LogManager.loadLogs and it seems to work, however I'm too new > to scala, so do not take it literally: > {code} > /** >* Recover and load all logs in the given data directories >*/ > private def loadLogs(dirs: Seq[File]) { > val threads : Array[Thread] = new Array[Thread](dirs.size) > var i: Int = 0 > val me = this > for(dir <- dirs) { > val thread = new Thread( new Runnable { > def run() > { > val recoveryPoints = me.recoveryPointCheckpoints(dir).read > /* load the logs */ > val subDirs = dir.listFiles() > if(subDirs != null) { > val cleanShutDownFile = new File(dir, Log.CleanShutdownFile) > if(cleanShutDownFile.exists()) > info("Found clean shutdown file. Skipping recovery for all logs > in data directory '%s'".format(dir.getAbsolutePath)) > for(dir <- subDirs) { > if(dir.isDirectory) { > info("Loading log '" + dir.getName + "'") > val topicPartition = Log.parseTopicPartitionName(dir.getName) > val config = topicConfigs.getOrElse(topicPartition.topic, > defaultConfig) > val log = new Log(dir, > config, > recoveryPoints.getOrElse(topicPartition, 0L), > scheduler, > time) > val previous = addLogWithLock(topicPartition, log) > if(previous != null) > throw new IllegalArgumentException("Duplicate log > directories found: %s, %s!".format(log.dir.getAbsolutePath, > previous.dir.getAbsolutePath)) > } > } > cleanShutDownFile.delete() > } > } > }) > thread.start() > threads(i) = thread > i = i + 1 > } > for(thread <- threads) { > thread.join() > } > } > def addLogWithLock(topicPartition: TopicAndPartition, log: Log): Log = { > logCreationOrDeletionLock synchronized { > this.logs.put(topicPartition, log) > } > } > {code} -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Updated] (KAFKA-1414) Speedup broker startup after hard reset
[ https://issues.apache.org/jira/browse/KAFKA-1414?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao updated KAFKA-1414: --- Resolution: Fixed Assignee: Anton Karamanov (was: Jay Kreps) Status: Resolved (was: Patch Available) Thanks for patch v5. +1 and committed to trunk. > Speedup broker startup after hard reset > --- > > Key: KAFKA-1414 > URL: https://issues.apache.org/jira/browse/KAFKA-1414 > Project: Kafka > Issue Type: Improvement > Components: log >Affects Versions: 0.8.1.1, 0.8.2, 0.9.0 >Reporter: Dmitry Bugaychenko >Assignee: Anton Karamanov > Fix For: 0.8.2 > > Attachments: > 0001-KAFKA-1414-Speedup-broker-startup-after-hard-reset-a.patch, > KAFKA-1414-rev1.patch, KAFKA-1414-rev2.fixed.patch, KAFKA-1414-rev2.patch, > KAFKA-1414-rev3-interleaving.patch, KAFKA-1414-rev3.patch, > KAFKA-1414-rev4.patch, KAFKA-1414-rev5.patch, freebie.patch, > parallel-dir-loading-0.8.patch, > parallel-dir-loading-trunk-fixed-threadpool.patch, > parallel-dir-loading-trunk-threadpool.patch, parallel-dir-loading-trunk.patch > > > After hard reset due to power failure broker takes way too much time > recovering unflushed segments in a single thread. This could be easiliy > improved launching multiple threads (one per data dirrectory, assuming that > typically each data directory is on a dedicated drive). Localy we trie this > simple patch to LogManager.loadLogs and it seems to work, however I'm too new > to scala, so do not take it literally: > {code} > /** >* Recover and load all logs in the given data directories >*/ > private def loadLogs(dirs: Seq[File]) { > val threads : Array[Thread] = new Array[Thread](dirs.size) > var i: Int = 0 > val me = this > for(dir <- dirs) { > val thread = new Thread( new Runnable { > def run() > { > val recoveryPoints = me.recoveryPointCheckpoints(dir).read > /* load the logs */ > val subDirs = dir.listFiles() > if(subDirs != null) { > val cleanShutDownFile = new File(dir, Log.CleanShutdownFile) > if(cleanShutDownFile.exists()) > info("Found clean shutdown file. Skipping recovery for all logs > in data directory '%s'".format(dir.getAbsolutePath)) > for(dir <- subDirs) { > if(dir.isDirectory) { > info("Loading log '" + dir.getName + "'") > val topicPartition = Log.parseTopicPartitionName(dir.getName) > val config = topicConfigs.getOrElse(topicPartition.topic, > defaultConfig) > val log = new Log(dir, > config, > recoveryPoints.getOrElse(topicPartition, 0L), > scheduler, > time) > val previous = addLogWithLock(topicPartition, log) > if(previous != null) > throw new IllegalArgumentException("Duplicate log > directories found: %s, %s!".format(log.dir.getAbsolutePath, > previous.dir.getAbsolutePath)) > } > } > cleanShutDownFile.delete() > } > } > }) > thread.start() > threads(i) = thread > i = i + 1 > } > for(thread <- threads) { > thread.join() > } > } > def addLogWithLock(topicPartition: TopicAndPartition, log: Log): Log = { > logCreationOrDeletionLock synchronized { > this.logs.put(topicPartition, log) > } > } > {code} -- This message was sent by Atlassian JIRA (v6.2#6252)