[jira] [Commented] (KAFKA-3359) Parallel log-recovery of un-flushed segments on startup

2017-10-10 Thread Ismael Juma (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3359?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16198493#comment-16198493
 ] 

Ismael Juma commented on KAFKA-3359:


[~bobrik], it shouldn't read all data from the partition. You may have run into 
https://github.com/apache/kafka/commit/91517e8fbd7767ba6d7f43b517f5a26b6f870585

> Parallel log-recovery of un-flushed segments on startup
> ---
>
> Key: KAFKA-3359
> URL: https://issues.apache.org/jira/browse/KAFKA-3359
> Project: Kafka
>  Issue Type: Improvement
>  Components: log
>Affects Versions: 0.8.2.2, 0.9.0.1
>Reporter: Vamsi Subhash Achanta
>Assignee: Jay Kreps
>
> On startup, currently the log segments within a logDir are loaded 
> sequentially when there is a un-clean shutdown. This will take a lot of time 
> for the segments to be loaded as the logSegment.recover(..) is called for 
> every segment and for brokers which have many partitions, the time taken will 
> be very high (we have noticed ~40mins for 2k partitions).
> https://github.com/apache/kafka/pull/1035
> This pull request will make the log-segment load parallel with two 
> configurable properties "log.recovery.threads" and 
> "log.recovery.max.interval.ms".
> Logic:
> 1. Have a threadpool defined of fixed length (log.recovery.threads)
> 2. Submit the logSegment recovery as a job to the threadpool and add the 
> future returned to a job list
> 3. Wait till all the jobs are done within req. time 
> (log.recovery.max.interval.ms - default set to Long.Max).
> 4. If they are done and the futures are all null (meaning that the jobs are 
> successfully completed), it is considered done.
> 5. If any of the recovery jobs failed, then it is logged and 
> LogRecoveryFailedException is thrown
> 6. If the timeout is reached, LogRecoveryFailedException is thrown.
> The logic is backward compatible with the current sequential implementation 
> as the default thread count is set to 1.
> PS: I am new to Scala and the code might look Java-ish but I will be happy to 
> modify the code review changes.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-3359) Parallel log-recovery of un-flushed segments on startup

2017-10-06 Thread Ivan Babrou (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3359?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16195247#comment-16195247
 ] 

Ivan Babrou commented on KAFKA-3359:


[~ijuma], good to know, I'll bump the setting for our cluster. Is there any 
reason to read all data from partition during recovery?

> Parallel log-recovery of un-flushed segments on startup
> ---
>
> Key: KAFKA-3359
> URL: https://issues.apache.org/jira/browse/KAFKA-3359
> Project: Kafka
>  Issue Type: Improvement
>  Components: log
>Affects Versions: 0.8.2.2, 0.9.0.1
>Reporter: Vamsi Subhash Achanta
>Assignee: Jay Kreps
>
> On startup, currently the log segments within a logDir are loaded 
> sequentially when there is a un-clean shutdown. This will take a lot of time 
> for the segments to be loaded as the logSegment.recover(..) is called for 
> every segment and for brokers which have many partitions, the time taken will 
> be very high (we have noticed ~40mins for 2k partitions).
> https://github.com/apache/kafka/pull/1035
> This pull request will make the log-segment load parallel with two 
> configurable properties "log.recovery.threads" and 
> "log.recovery.max.interval.ms".
> Logic:
> 1. Have a threadpool defined of fixed length (log.recovery.threads)
> 2. Submit the logSegment recovery as a job to the threadpool and add the 
> future returned to a job list
> 3. Wait till all the jobs are done within req. time 
> (log.recovery.max.interval.ms - default set to Long.Max).
> 4. If they are done and the futures are all null (meaning that the jobs are 
> successfully completed), it is considered done.
> 5. If any of the recovery jobs failed, then it is logged and 
> LogRecoveryFailedException is thrown
> 6. If the timeout is reached, LogRecoveryFailedException is thrown.
> The logic is backward compatible with the current sequential implementation 
> as the default thread count is set to 1.
> PS: I am new to Scala and the code might look Java-ish but I will be happy to 
> modify the code review changes.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-3359) Parallel log-recovery of un-flushed segments on startup

2017-10-05 Thread Ismael Juma (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3359?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16192787#comment-16192787
 ] 

Ismael Juma commented on KAFKA-3359:


[~bobrik], do you have a single partition? Multiple partitions/logs are loaded 
in parallel. It's just the segments within a log that are loaded sequentially. 
Take a look at: "num.recovery.threads.per.data.dir".

> Parallel log-recovery of un-flushed segments on startup
> ---
>
> Key: KAFKA-3359
> URL: https://issues.apache.org/jira/browse/KAFKA-3359
> Project: Kafka
>  Issue Type: Improvement
>  Components: log
>Affects Versions: 0.8.2.2, 0.9.0.1
>Reporter: Vamsi Subhash Achanta
>Assignee: Jay Kreps
>
> On startup, currently the log segments within a logDir are loaded 
> sequentially when there is a un-clean shutdown. This will take a lot of time 
> for the segments to be loaded as the logSegment.recover(..) is called for 
> every segment and for brokers which have many partitions, the time taken will 
> be very high (we have noticed ~40mins for 2k partitions).
> https://github.com/apache/kafka/pull/1035
> This pull request will make the log-segment load parallel with two 
> configurable properties "log.recovery.threads" and 
> "log.recovery.max.interval.ms".
> Logic:
> 1. Have a threadpool defined of fixed length (log.recovery.threads)
> 2. Submit the logSegment recovery as a job to the threadpool and add the 
> future returned to a job list
> 3. Wait till all the jobs are done within req. time 
> (log.recovery.max.interval.ms - default set to Long.Max).
> 4. If they are done and the futures are all null (meaning that the jobs are 
> successfully completed), it is considered done.
> 5. If any of the recovery jobs failed, then it is logged and 
> LogRecoveryFailedException is thrown
> 6. If the timeout is reached, LogRecoveryFailedException is thrown.
> The logic is backward compatible with the current sequential implementation 
> as the default thread count is set to 1.
> PS: I am new to Scala and the code might look Java-ish but I will be happy to 
> modify the code review changes.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-3359) Parallel log-recovery of un-flushed segments on startup

2017-10-04 Thread Ivan Babrou (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3359?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16192405#comment-16192405
 ] 

Ivan Babrou commented on KAFKA-3359:


It's me again. We hit the issue again and I googled into this issue again. 
Wanted to add that Kafka re-reads full partitions to recover and it takes 20 
minutes on smallest ones that are around 1.5TB:

{noformat}
Oct 05 01:10:43 mybroker14 kafka[32940]: WARN Found a corrupted index file due 
to requirement failed: Corrupt index found, index file 
(/state/kafka/http/requests-47/0001246285678992.index) has non-zero size 
but the last offset is 1246285678992 which is no larger than the base offset 
1246285678992.}. deleting 
/state/kafka/http/requests-47/0001246285678992.timeindex, 
/state/kafka/http/requests-47/0001246285678992.index, and 
/state/kafka/http/requests-47/0001246285678992.txnindex and rebuilding 
index... (kafka.log.Log)
Oct 05 01:10:43 mybroker14 kafka[32940]: INFO Loading producer state from 
snapshot file '/state/kafka/http/requests-47/0001246285678992.snapshot' for 
partition requests-47 (kafka.log.ProducerStateManager)
Oct 05 01:10:47 mybroker14 kafka[32940]: INFO Recovering unflushed segment 
1246283087840 in log requests-47. (kafka.log.Log)
Oct 05 01:31:29 mybroker14 kafka[32940]: INFO Recovering unflushed segment 
1246284384425 in log requests-47. (kafka.log.Log)
Oct 05 01:31:29 mybroker14 kafka[32940]: INFO Loading producer state from 
snapshot file '/state/kafka/http/requests-47/0001246283087840.snapshot' for 
partition requests-47 (kafka.log.ProducerStateManager)
Oct 05 01:31:36 mybroker14 kafka[32940]: INFO Recovering unflushed segment 
1246285678992 in log requests-47. (kafka.log.Log)
Oct 05 01:31:36 mybroker14 kafka[32940]: INFO Loading producer state from 
snapshot file '/state/kafka/http/requests-47/0001246284384425.snapshot' for 
partition requests-47 (kafka.log.ProducerStateManager)
Oct 05 01:31:42 mybroker14 kafka[32940]: INFO Loading producer state from 
offset 1246286680535 for partition requests-47 with message format version 0 
(kafka.log.Log)
Oct 05 01:31:42 mybroker14 kafka[32940]: INFO Loading producer state from 
snapshot file '/state/kafka/http/requests-47/0001246285678992.snapshot' for 
partition requests-47 (kafka.log.ProducerStateManager)
Oct 05 01:31:43 mybroker14 kafka[32940]: INFO Completed load of log requests-47 
with 719 log segments, log start offset 1245351135299 and log end offset 
1246286680535 in 1260684 ms (kafka.log.Log)
{noformat}

> Parallel log-recovery of un-flushed segments on startup
> ---
>
> Key: KAFKA-3359
> URL: https://issues.apache.org/jira/browse/KAFKA-3359
> Project: Kafka
>  Issue Type: Improvement
>  Components: log
>Affects Versions: 0.8.2.2, 0.9.0.1
>Reporter: Vamsi Subhash Achanta
>Assignee: Jay Kreps
>
> On startup, currently the log segments within a logDir are loaded 
> sequentially when there is a un-clean shutdown. This will take a lot of time 
> for the segments to be loaded as the logSegment.recover(..) is called for 
> every segment and for brokers which have many partitions, the time taken will 
> be very high (we have noticed ~40mins for 2k partitions).
> https://github.com/apache/kafka/pull/1035
> This pull request will make the log-segment load parallel with two 
> configurable properties "log.recovery.threads" and 
> "log.recovery.max.interval.ms".
> Logic:
> 1. Have a threadpool defined of fixed length (log.recovery.threads)
> 2. Submit the logSegment recovery as a job to the threadpool and add the 
> future returned to a job list
> 3. Wait till all the jobs are done within req. time 
> (log.recovery.max.interval.ms - default set to Long.Max).
> 4. If they are done and the futures are all null (meaning that the jobs are 
> successfully completed), it is considered done.
> 5. If any of the recovery jobs failed, then it is logged and 
> LogRecoveryFailedException is thrown
> 6. If the timeout is reached, LogRecoveryFailedException is thrown.
> The logic is backward compatible with the current sequential implementation 
> as the default thread count is set to 1.
> PS: I am new to Scala and the code might look Java-ish but I will be happy to 
> modify the code review changes.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)