[jira] [Commented] (KAFKA-3359) Parallel log-recovery of un-flushed segments on startup
[ https://issues.apache.org/jira/browse/KAFKA-3359?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16198493#comment-16198493 ] Ismael Juma commented on KAFKA-3359: [~bobrik], it shouldn't read all data from the partition. You may have run into https://github.com/apache/kafka/commit/91517e8fbd7767ba6d7f43b517f5a26b6f870585 > Parallel log-recovery of un-flushed segments on startup > --- > > Key: KAFKA-3359 > URL: https://issues.apache.org/jira/browse/KAFKA-3359 > Project: Kafka > Issue Type: Improvement > Components: log >Affects Versions: 0.8.2.2, 0.9.0.1 >Reporter: Vamsi Subhash Achanta >Assignee: Jay Kreps > > On startup, currently the log segments within a logDir are loaded > sequentially when there is a un-clean shutdown. This will take a lot of time > for the segments to be loaded as the logSegment.recover(..) is called for > every segment and for brokers which have many partitions, the time taken will > be very high (we have noticed ~40mins for 2k partitions). > https://github.com/apache/kafka/pull/1035 > This pull request will make the log-segment load parallel with two > configurable properties "log.recovery.threads" and > "log.recovery.max.interval.ms". > Logic: > 1. Have a threadpool defined of fixed length (log.recovery.threads) > 2. Submit the logSegment recovery as a job to the threadpool and add the > future returned to a job list > 3. Wait till all the jobs are done within req. time > (log.recovery.max.interval.ms - default set to Long.Max). > 4. If they are done and the futures are all null (meaning that the jobs are > successfully completed), it is considered done. > 5. If any of the recovery jobs failed, then it is logged and > LogRecoveryFailedException is thrown > 6. If the timeout is reached, LogRecoveryFailedException is thrown. > The logic is backward compatible with the current sequential implementation > as the default thread count is set to 1. > PS: I am new to Scala and the code might look Java-ish but I will be happy to > modify the code review changes. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-3359) Parallel log-recovery of un-flushed segments on startup
[ https://issues.apache.org/jira/browse/KAFKA-3359?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16195247#comment-16195247 ] Ivan Babrou commented on KAFKA-3359: [~ijuma], good to know, I'll bump the setting for our cluster. Is there any reason to read all data from partition during recovery? > Parallel log-recovery of un-flushed segments on startup > --- > > Key: KAFKA-3359 > URL: https://issues.apache.org/jira/browse/KAFKA-3359 > Project: Kafka > Issue Type: Improvement > Components: log >Affects Versions: 0.8.2.2, 0.9.0.1 >Reporter: Vamsi Subhash Achanta >Assignee: Jay Kreps > > On startup, currently the log segments within a logDir are loaded > sequentially when there is a un-clean shutdown. This will take a lot of time > for the segments to be loaded as the logSegment.recover(..) is called for > every segment and for brokers which have many partitions, the time taken will > be very high (we have noticed ~40mins for 2k partitions). > https://github.com/apache/kafka/pull/1035 > This pull request will make the log-segment load parallel with two > configurable properties "log.recovery.threads" and > "log.recovery.max.interval.ms". > Logic: > 1. Have a threadpool defined of fixed length (log.recovery.threads) > 2. Submit the logSegment recovery as a job to the threadpool and add the > future returned to a job list > 3. Wait till all the jobs are done within req. time > (log.recovery.max.interval.ms - default set to Long.Max). > 4. If they are done and the futures are all null (meaning that the jobs are > successfully completed), it is considered done. > 5. If any of the recovery jobs failed, then it is logged and > LogRecoveryFailedException is thrown > 6. If the timeout is reached, LogRecoveryFailedException is thrown. > The logic is backward compatible with the current sequential implementation > as the default thread count is set to 1. > PS: I am new to Scala and the code might look Java-ish but I will be happy to > modify the code review changes. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-3359) Parallel log-recovery of un-flushed segments on startup
[ https://issues.apache.org/jira/browse/KAFKA-3359?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16192787#comment-16192787 ] Ismael Juma commented on KAFKA-3359: [~bobrik], do you have a single partition? Multiple partitions/logs are loaded in parallel. It's just the segments within a log that are loaded sequentially. Take a look at: "num.recovery.threads.per.data.dir". > Parallel log-recovery of un-flushed segments on startup > --- > > Key: KAFKA-3359 > URL: https://issues.apache.org/jira/browse/KAFKA-3359 > Project: Kafka > Issue Type: Improvement > Components: log >Affects Versions: 0.8.2.2, 0.9.0.1 >Reporter: Vamsi Subhash Achanta >Assignee: Jay Kreps > > On startup, currently the log segments within a logDir are loaded > sequentially when there is a un-clean shutdown. This will take a lot of time > for the segments to be loaded as the logSegment.recover(..) is called for > every segment and for brokers which have many partitions, the time taken will > be very high (we have noticed ~40mins for 2k partitions). > https://github.com/apache/kafka/pull/1035 > This pull request will make the log-segment load parallel with two > configurable properties "log.recovery.threads" and > "log.recovery.max.interval.ms". > Logic: > 1. Have a threadpool defined of fixed length (log.recovery.threads) > 2. Submit the logSegment recovery as a job to the threadpool and add the > future returned to a job list > 3. Wait till all the jobs are done within req. time > (log.recovery.max.interval.ms - default set to Long.Max). > 4. If they are done and the futures are all null (meaning that the jobs are > successfully completed), it is considered done. > 5. If any of the recovery jobs failed, then it is logged and > LogRecoveryFailedException is thrown > 6. If the timeout is reached, LogRecoveryFailedException is thrown. > The logic is backward compatible with the current sequential implementation > as the default thread count is set to 1. > PS: I am new to Scala and the code might look Java-ish but I will be happy to > modify the code review changes. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-3359) Parallel log-recovery of un-flushed segments on startup
[ https://issues.apache.org/jira/browse/KAFKA-3359?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16192405#comment-16192405 ] Ivan Babrou commented on KAFKA-3359: It's me again. We hit the issue again and I googled into this issue again. Wanted to add that Kafka re-reads full partitions to recover and it takes 20 minutes on smallest ones that are around 1.5TB: {noformat} Oct 05 01:10:43 mybroker14 kafka[32940]: WARN Found a corrupted index file due to requirement failed: Corrupt index found, index file (/state/kafka/http/requests-47/0001246285678992.index) has non-zero size but the last offset is 1246285678992 which is no larger than the base offset 1246285678992.}. deleting /state/kafka/http/requests-47/0001246285678992.timeindex, /state/kafka/http/requests-47/0001246285678992.index, and /state/kafka/http/requests-47/0001246285678992.txnindex and rebuilding index... (kafka.log.Log) Oct 05 01:10:43 mybroker14 kafka[32940]: INFO Loading producer state from snapshot file '/state/kafka/http/requests-47/0001246285678992.snapshot' for partition requests-47 (kafka.log.ProducerStateManager) Oct 05 01:10:47 mybroker14 kafka[32940]: INFO Recovering unflushed segment 1246283087840 in log requests-47. (kafka.log.Log) Oct 05 01:31:29 mybroker14 kafka[32940]: INFO Recovering unflushed segment 1246284384425 in log requests-47. (kafka.log.Log) Oct 05 01:31:29 mybroker14 kafka[32940]: INFO Loading producer state from snapshot file '/state/kafka/http/requests-47/0001246283087840.snapshot' for partition requests-47 (kafka.log.ProducerStateManager) Oct 05 01:31:36 mybroker14 kafka[32940]: INFO Recovering unflushed segment 1246285678992 in log requests-47. (kafka.log.Log) Oct 05 01:31:36 mybroker14 kafka[32940]: INFO Loading producer state from snapshot file '/state/kafka/http/requests-47/0001246284384425.snapshot' for partition requests-47 (kafka.log.ProducerStateManager) Oct 05 01:31:42 mybroker14 kafka[32940]: INFO Loading producer state from offset 1246286680535 for partition requests-47 with message format version 0 (kafka.log.Log) Oct 05 01:31:42 mybroker14 kafka[32940]: INFO Loading producer state from snapshot file '/state/kafka/http/requests-47/0001246285678992.snapshot' for partition requests-47 (kafka.log.ProducerStateManager) Oct 05 01:31:43 mybroker14 kafka[32940]: INFO Completed load of log requests-47 with 719 log segments, log start offset 1245351135299 and log end offset 1246286680535 in 1260684 ms (kafka.log.Log) {noformat} > Parallel log-recovery of un-flushed segments on startup > --- > > Key: KAFKA-3359 > URL: https://issues.apache.org/jira/browse/KAFKA-3359 > Project: Kafka > Issue Type: Improvement > Components: log >Affects Versions: 0.8.2.2, 0.9.0.1 >Reporter: Vamsi Subhash Achanta >Assignee: Jay Kreps > > On startup, currently the log segments within a logDir are loaded > sequentially when there is a un-clean shutdown. This will take a lot of time > for the segments to be loaded as the logSegment.recover(..) is called for > every segment and for brokers which have many partitions, the time taken will > be very high (we have noticed ~40mins for 2k partitions). > https://github.com/apache/kafka/pull/1035 > This pull request will make the log-segment load parallel with two > configurable properties "log.recovery.threads" and > "log.recovery.max.interval.ms". > Logic: > 1. Have a threadpool defined of fixed length (log.recovery.threads) > 2. Submit the logSegment recovery as a job to the threadpool and add the > future returned to a job list > 3. Wait till all the jobs are done within req. time > (log.recovery.max.interval.ms - default set to Long.Max). > 4. If they are done and the futures are all null (meaning that the jobs are > successfully completed), it is considered done. > 5. If any of the recovery jobs failed, then it is logged and > LogRecoveryFailedException is thrown > 6. If the timeout is reached, LogRecoveryFailedException is thrown. > The logic is backward compatible with the current sequential implementation > as the default thread count is set to 1. > PS: I am new to Scala and the code might look Java-ish but I will be happy to > modify the code review changes. -- This message was sent by Atlassian JIRA (v6.4.14#64029)