[jira] [Commented] (KAFKA-3471) min.insync.replicas isn't respected when there's a delaying follower who still in ISR

2016-03-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3471?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15219271#comment-15219271
 ] 

ASF GitHub Bot commented on KAFKA-3471:
---

Github user kawamuray closed the pull request at:

https://github.com/apache/kafka/pull/1146


> min.insync.replicas isn't respected when there's a delaying follower who 
> still in ISR
> -
>
> Key: KAFKA-3471
> URL: https://issues.apache.org/jira/browse/KAFKA-3471
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.9.0.1
>Reporter: Yuto Kawamura
>
> h2. tl;dr;
> Partition.checkEnoughReplicasReachOffset should see the number of followers 
> which are already caught up until requiredOffset instead of high watermark to 
> consider whether there are enough number of replicas for a produce request.
> h2. Description
> Just recently I found an interesting metric on our Kafka cluster.
> During the peak time, the number of produce requests significantly decreased 
> only on single broker. Let's say this broker's id=1.
> - broker-1 holds leadership for 3 partitions of the topic T.
> - For each producer they configured to have acks=all.
> - broker-1 contains some topics and each topic is configured to have 3 
> replicas, and min.insync.replicas is configured to 2.
> - For the partition 1 of topic T(T-1) there are 3 replicas namely: 
> broker-1(leader), broker-2(follower-A), broker-3(follower-B).
> When I see the logs of broker-1, there was lot's of logs indicating ISR 
> expand and shrink happening frequently for T-1.
> After investigating a while, we restarted broker-1 and unexpectedly 
> continuous ISR expand/shrink had gone.
> Since it is highly likely a state corruption issue(because it's fixed by a 
> simple restart) and it's never reproduced after a broker restart,
> unfortunately, but we lost clue to understand what was happening actually so 
> until today I'm not knowing the cause of this phenomenon.
> By the way we continued investigating why frequent ISR shrink/expand causes 
> reduction of the number of produce requests and found that kafka broker isn't 
> likely respecting min.insync.replicas as the document of this config 
> describes.
> Here's the scenario:
> 1. Everything working well.
>ISR(=LogEndOffset): leader=3, follower-A=2, follower-B=2
>HW: 2
> 2. Producer client produces some records. For simplicity it contains only one 
> record so the LogEndOffset is updated to 4, and the request will put into 
> purgatory since it has requiredOffset=4 while HW stay in 2.
>ISR(=LogEndOffset): leader=4, follower-A=2, follower-B=2
>HW: 2
> 3. follower-A performs fetch and updated it's LogEndOffset to 4. IIUC, the 
> request received at 2. can be considered as succeeded ATM since it requires 
> only 2 out of 3 replicas are in sync(min.insync.replicas=2, acks=all), but 
> it's not with current implementation because of HW(ref: 
> https://github.com/apache/kafka/blob/1fbe445dde71df0023a978c5e54dd229d3d23e1b/core/src/main/scala/kafka/cluster/Partition.scala#L325).
>ISR(=LogEndOffset): leader=4, follower-A=4, follower-B=2
>HW: 2
> 4. By some reason, follower-B couldn't perform fetch for a while. ATM 
> follower-B still in ISR because of replica.lag.time.max.ms, meaning it still 
> affects HW.
>ISR(=LogEndOffset): leader=4, follower-A=4, follower-B=2
>HW: 2
> 5. By timeout the produce request received at 2. considered as failed and 
> client retries. Any incomming requests for T-1 will never succeed during this 
> moment.
>ISR(=LogEndOffset): leader=4, follower-A=4, follower-B=2
>HW: 2
> 6. The leader decides to abandon follower-B from ISR because of 
> replica.lag.time.max.ms. HW increased to 4 and all produce requests can now 
> successfully processed.
>ISR(=LogEndOffset): leader=4, follower-A=4
>HW: 4
> 7. After a while follower-B came back and caught up until the LogEndOffset so 
> the leader let him in to ISR again. The situation goes back to 1., continues 
> again.
> So here I understand that records on a producer are accumulated to single 
> batch while the produce request for the T-1 blocked(and retried) during 4-6 
> and that's why the total number of requests decreased significantly on 
> broker-1 while the total number of messages hasn't changed.
> As I commented on 3., the leader should consider a produce request succeeded 
> after it confirms min.insync.replicas's number of acks, so the current 
> implementation which makes produce requests dependent to HW isn't makes sense 
> IMO.
> When I confirmed this scenario our Kafka cluster used version 0.8.2.1 but I 
> confirmed that this scenario still could happen with the build from the 
> latest version of trunk.
> Actually I still don't understand whether this is intentional behavior or 

[jira] [Commented] (KAFKA-3471) min.insync.replicas isn't respected when there's a delaying follower who still in ISR

2016-03-30 Thread Yuto Kawamura (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3471?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15219267#comment-15219267
 ] 

Yuto Kawamura commented on KAFKA-3471:
--

Thank you very much for your advice. It was very helpful for me to understand 
why the current producing requests are depends to HW.
Let me close this ticket and corresponding PR as well.

> min.insync.replicas isn't respected when there's a delaying follower who 
> still in ISR
> -
>
> Key: KAFKA-3471
> URL: https://issues.apache.org/jira/browse/KAFKA-3471
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.9.0.1
>Reporter: Yuto Kawamura
>
> h2. tl;dr;
> Partition.checkEnoughReplicasReachOffset should see the number of followers 
> which are already caught up until requiredOffset instead of high watermark to 
> consider whether there are enough number of replicas for a produce request.
> h2. Description
> Just recently I found an interesting metric on our Kafka cluster.
> During the peak time, the number of produce requests significantly decreased 
> only on single broker. Let's say this broker's id=1.
> - broker-1 holds leadership for 3 partitions of the topic T.
> - For each producer they configured to have acks=all.
> - broker-1 contains some topics and each topic is configured to have 3 
> replicas, and min.insync.replicas is configured to 2.
> - For the partition 1 of topic T(T-1) there are 3 replicas namely: 
> broker-1(leader), broker-2(follower-A), broker-3(follower-B).
> When I see the logs of broker-1, there was lot's of logs indicating ISR 
> expand and shrink happening frequently for T-1.
> After investigating a while, we restarted broker-1 and unexpectedly 
> continuous ISR expand/shrink had gone.
> Since it is highly likely a state corruption issue(because it's fixed by a 
> simple restart) and it's never reproduced after a broker restart,
> unfortunately, but we lost clue to understand what was happening actually so 
> until today I'm not knowing the cause of this phenomenon.
> By the way we continued investigating why frequent ISR shrink/expand causes 
> reduction of the number of produce requests and found that kafka broker isn't 
> likely respecting min.insync.replicas as the document of this config 
> describes.
> Here's the scenario:
> 1. Everything working well.
>ISR(=LogEndOffset): leader=3, follower-A=2, follower-B=2
>HW: 2
> 2. Producer client produces some records. For simplicity it contains only one 
> record so the LogEndOffset is updated to 4, and the request will put into 
> purgatory since it has requiredOffset=4 while HW stay in 2.
>ISR(=LogEndOffset): leader=4, follower-A=2, follower-B=2
>HW: 2
> 3. follower-A performs fetch and updated it's LogEndOffset to 4. IIUC, the 
> request received at 2. can be considered as succeeded ATM since it requires 
> only 2 out of 3 replicas are in sync(min.insync.replicas=2, acks=all), but 
> it's not with current implementation because of HW(ref: 
> https://github.com/apache/kafka/blob/1fbe445dde71df0023a978c5e54dd229d3d23e1b/core/src/main/scala/kafka/cluster/Partition.scala#L325).
>ISR(=LogEndOffset): leader=4, follower-A=4, follower-B=2
>HW: 2
> 4. By some reason, follower-B couldn't perform fetch for a while. ATM 
> follower-B still in ISR because of replica.lag.time.max.ms, meaning it still 
> affects HW.
>ISR(=LogEndOffset): leader=4, follower-A=4, follower-B=2
>HW: 2
> 5. By timeout the produce request received at 2. considered as failed and 
> client retries. Any incomming requests for T-1 will never succeed during this 
> moment.
>ISR(=LogEndOffset): leader=4, follower-A=4, follower-B=2
>HW: 2
> 6. The leader decides to abandon follower-B from ISR because of 
> replica.lag.time.max.ms. HW increased to 4 and all produce requests can now 
> successfully processed.
>ISR(=LogEndOffset): leader=4, follower-A=4
>HW: 4
> 7. After a while follower-B came back and caught up until the LogEndOffset so 
> the leader let him in to ISR again. The situation goes back to 1., continues 
> again.
> So here I understand that records on a producer are accumulated to single 
> batch while the produce request for the T-1 blocked(and retried) during 4-6 
> and that's why the total number of requests decreased significantly on 
> broker-1 while the total number of messages hasn't changed.
> As I commented on 3., the leader should consider a produce request succeeded 
> after it confirms min.insync.replicas's number of acks, so the current 
> implementation which makes produce requests dependent to HW isn't makes sense 
> IMO.
> When I confirmed this scenario our Kafka cluster used version 0.8.2.1 but I 
> confirmed that this scenario still could happen with the build from the 
> latest 

[jira] [Commented] (KAFKA-3471) min.insync.replicas isn't respected when there's a delaying follower who still in ISR

2016-03-29 Thread Jiangjie Qin (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3471?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15217295#comment-15217295
 ] 

Jiangjie Qin commented on KAFKA-3471:
-

I think it depends on what you actually want. Shortening 
replica.lag.time.max.ms will let the leader kick the slow broker out of ISR 
quicker, but might increase the possibility of under replicated partition 
churning. It seems usually we want to producer to handle it. So setting the 
buffer size, retry and back off ms seems reasonable solution. It allows the 
producer buffer the messages so it is more resilient to such broker side issues.

> min.insync.replicas isn't respected when there's a delaying follower who 
> still in ISR
> -
>
> Key: KAFKA-3471
> URL: https://issues.apache.org/jira/browse/KAFKA-3471
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.9.0.1
>Reporter: Yuto Kawamura
>
> h2. tl;dr;
> Partition.checkEnoughReplicasReachOffset should see the number of followers 
> which are already caught up until requiredOffset instead of high watermark to 
> consider whether there are enough number of replicas for a produce request.
> h2. Description
> Just recently I found an interesting metric on our Kafka cluster.
> During the peak time, the number of produce requests significantly decreased 
> only on single broker. Let's say this broker's id=1.
> - broker-1 holds leadership for 3 partitions of the topic T.
> - For each producer they configured to have acks=all.
> - broker-1 contains some topics and each topic is configured to have 3 
> replicas, and min.insync.replicas is configured to 2.
> - For the partition 1 of topic T(T-1) there are 3 replicas namely: 
> broker-1(leader), broker-2(follower-A), broker-3(follower-B).
> When I see the logs of broker-1, there was lot's of logs indicating ISR 
> expand and shrink happening frequently for T-1.
> After investigating a while, we restarted broker-1 and unexpectedly 
> continuous ISR expand/shrink had gone.
> Since it is highly likely a state corruption issue(because it's fixed by a 
> simple restart) and it's never reproduced after a broker restart,
> unfortunately, but we lost clue to understand what was happening actually so 
> until today I'm not knowing the cause of this phenomenon.
> By the way we continued investigating why frequent ISR shrink/expand causes 
> reduction of the number of produce requests and found that kafka broker isn't 
> likely respecting min.insync.replicas as the document of this config 
> describes.
> Here's the scenario:
> 1. Everything working well.
>ISR(=LogEndOffset): leader=3, follower-A=2, follower-B=2
>HW: 2
> 2. Producer client produces some records. For simplicity it contains only one 
> record so the LogEndOffset is updated to 4, and the request will put into 
> purgatory since it has requiredOffset=4 while HW stay in 2.
>ISR(=LogEndOffset): leader=4, follower-A=2, follower-B=2
>HW: 2
> 3. follower-A performs fetch and updated it's LogEndOffset to 4. IIUC, the 
> request received at 2. can be considered as succeeded ATM since it requires 
> only 2 out of 3 replicas are in sync(min.insync.replicas=2, acks=all), but 
> it's not with current implementation because of HW(ref: 
> https://github.com/apache/kafka/blob/1fbe445dde71df0023a978c5e54dd229d3d23e1b/core/src/main/scala/kafka/cluster/Partition.scala#L325).
>ISR(=LogEndOffset): leader=4, follower-A=4, follower-B=2
>HW: 2
> 4. By some reason, follower-B couldn't perform fetch for a while. ATM 
> follower-B still in ISR because of replica.lag.time.max.ms, meaning it still 
> affects HW.
>ISR(=LogEndOffset): leader=4, follower-A=4, follower-B=2
>HW: 2
> 5. By timeout the produce request received at 2. considered as failed and 
> client retries. Any incomming requests for T-1 will never succeed during this 
> moment.
>ISR(=LogEndOffset): leader=4, follower-A=4, follower-B=2
>HW: 2
> 6. The leader decides to abandon follower-B from ISR because of 
> replica.lag.time.max.ms. HW increased to 4 and all produce requests can now 
> successfully processed.
>ISR(=LogEndOffset): leader=4, follower-A=4
>HW: 4
> 7. After a while follower-B came back and caught up until the LogEndOffset so 
> the leader let him in to ISR again. The situation goes back to 1., continues 
> again.
> So here I understand that records on a producer are accumulated to single 
> batch while the produce request for the T-1 blocked(and retried) during 4-6 
> and that's why the total number of requests decreased significantly on 
> broker-1 while the total number of messages hasn't changed.
> As I commented on 3., the leader should consider a produce request succeeded 
> after it confirms min.insync.replicas's number of acks, so the current 
> 

[jira] [Commented] (KAFKA-3471) min.insync.replicas isn't respected when there's a delaying follower who still in ISR

2016-03-28 Thread Yuto Kawamura (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3471?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15215406#comment-15215406
 ] 

Yuto Kawamura commented on KAFKA-3471:
--

[~becket_qin] Ah.. I wasn't aware of the leader election strategy of Kafka and 
just imagined that it chooses the follower which has highest LogEndOffset.
After reading through code around controller, now I understand that it simply 
takes the head element in living ISRs regardless to which positions has each 
follower caught up: 
https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala#L87

So the only way to prevent produce requests being blocked and dropped during 
4-6 is shortening replica.lag.time.max.ms to reasonably small value then adjust 
producer's buffer size and retry count(and/or backoff) in order to keep more 
records until ISR shrinked. Am I correct?


> min.insync.replicas isn't respected when there's a delaying follower who 
> still in ISR
> -
>
> Key: KAFKA-3471
> URL: https://issues.apache.org/jira/browse/KAFKA-3471
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.9.0.1
>Reporter: Yuto Kawamura
>
> h2. tl;dr;
> Partition.checkEnoughReplicasReachOffset should see the number of followers 
> which are already caught up until requiredOffset instead of high watermark to 
> consider whether there are enough number of replicas for a produce request.
> h2. Description
> Just recently I found an interesting metric on our Kafka cluster.
> During the peak time, the number of produce requests significantly decreased 
> only on single broker. Let's say this broker's id=1.
> - broker-1 holds leadership for 3 partitions of the topic T.
> - For each producer they configured to have acks=all.
> - broker-1 contains some topics and each topic is configured to have 3 
> replicas, and min.insync.replicas is configured to 2.
> - For the partition 1 of topic T(T-1) there are 3 replicas namely: 
> broker-1(leader), broker-2(follower-A), broker-3(follower-B).
> When I see the logs of broker-1, there was lot's of logs indicating ISR 
> expand and shrink happening frequently for T-1.
> After investigating a while, we restarted broker-1 and unexpectedly 
> continuous ISR expand/shrink had gone.
> Since it is highly likely a state corruption issue(because it's fixed by a 
> simple restart) and it's never reproduced after a broker restart,
> unfortunately, but we lost clue to understand what was happening actually so 
> until today I'm not knowing the cause of this phenomenon.
> By the way we continued investigating why frequent ISR shrink/expand causes 
> reduction of the number of produce requests and found that kafka broker isn't 
> likely respecting min.insync.replicas as the document of this config 
> describes.
> Here's the scenario:
> 1. Everything working well.
>ISR(=LogEndOffset): leader=3, follower-A=2, follower-B=2
>HW: 2
> 2. Producer client produces some records. For simplicity it contains only one 
> record so the LogEndOffset is updated to 4, and the request will put into 
> purgatory since it has requiredOffset=4 while HW stay in 2.
>ISR(=LogEndOffset): leader=4, follower-A=2, follower-B=2
>HW: 2
> 3. follower-A performs fetch and updated it's LogEndOffset to 4. IIUC, the 
> request received at 2. can be considered as succeeded ATM since it requires 
> only 2 out of 3 replicas are in sync(min.insync.replicas=2, acks=all), but 
> it's not with current implementation because of HW(ref: 
> https://github.com/apache/kafka/blob/1fbe445dde71df0023a978c5e54dd229d3d23e1b/core/src/main/scala/kafka/cluster/Partition.scala#L325).
>ISR(=LogEndOffset): leader=4, follower-A=4, follower-B=2
>HW: 2
> 4. By some reason, follower-B couldn't perform fetch for a while. ATM 
> follower-B still in ISR because of replica.lag.time.max.ms, meaning it still 
> affects HW.
>ISR(=LogEndOffset): leader=4, follower-A=4, follower-B=2
>HW: 2
> 5. By timeout the produce request received at 2. considered as failed and 
> client retries. Any incomming requests for T-1 will never succeed during this 
> moment.
>ISR(=LogEndOffset): leader=4, follower-A=4, follower-B=2
>HW: 2
> 6. The leader decides to abandon follower-B from ISR because of 
> replica.lag.time.max.ms. HW increased to 4 and all produce requests can now 
> successfully processed.
>ISR(=LogEndOffset): leader=4, follower-A=4
>HW: 4
> 7. After a while follower-B came back and caught up until the LogEndOffset so 
> the leader let him in to ISR again. The situation goes back to 1., continues 
> again.
> So here I understand that records on a producer are accumulated to single 
> batch while the produce request for the T-1 blocked(and retried) during 4-6 
> and 

[jira] [Commented] (KAFKA-3471) min.insync.replicas isn't respected when there's a delaying follower who still in ISR

2016-03-28 Thread Jiangjie Qin (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3471?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15214698#comment-15214698
 ] 

Jiangjie Qin commented on KAFKA-3471:
-

[~kawamuray] In your example, if we return the produce response at step 3 and 
the leader is down before it shrink ISR and kick follower-B out, follower-B 
might be elected as leader. In that case, because follower-B does not have 
message 2, the produced message at 2 will be lost, right?

> min.insync.replicas isn't respected when there's a delaying follower who 
> still in ISR
> -
>
> Key: KAFKA-3471
> URL: https://issues.apache.org/jira/browse/KAFKA-3471
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.9.0.1
>Reporter: Yuto Kawamura
>
> h2. tl;dr;
> Partition.checkEnoughReplicasReachOffset should see the number of followers 
> which are already caught up until requiredOffset instead of high watermark to 
> consider whether there are enough number of replicas for a produce request.
> h2. Description
> Just recently I found an interesting metric on our Kafka cluster.
> During the peak time, the number of produce requests significantly decreased 
> only on single broker. Let's say this broker's id=1.
> - broker-1 holds leadership for 3 partitions of the topic T.
> - For each producer they configured to have acks=all.
> - broker-1 contains some topics and each topic is configured to have 3 
> replicas, and min.insync.replicas is configured to 2.
> - For the partition 1 of topic T(T-1) there are 3 replicas namely: 
> broker-1(leader), broker-2(follower-A), broker-3(follower-B).
> When I see the logs of broker-1, there was lot's of logs indicating ISR 
> expand and shrink happening frequently for T-1.
> After investigating a while, we restarted broker-1 and unexpectedly 
> continuous ISR expand/shrink had gone.
> Since it is highly likely a state corruption issue(because it's fixed by a 
> simple restart) and it's never reproduced after a broker restart,
> unfortunately, but we lost clue to understand what was happening actually so 
> until today I'm not knowing the cause of this phenomenon.
> By the way we continued investigating why frequent ISR shrink/expand causes 
> reduction of the number of produce requests and found that kafka broker isn't 
> likely respecting min.insync.replicas as the document of this config 
> describes.
> Here's the scenario:
> 1. Everything working well.
>ISR(=LogEndOffset): leader=3, follower-A=2, follower-B=2
>HW: 2
> 2. Producer client produces some records. For simplicity it contains only one 
> record so the LogEndOffset is updated to 4, and the request will put into 
> purgatory since it has requiredOffset=4 while HW stay in 2.
>ISR(=LogEndOffset): leader=4, follower-A=2, follower-B=2
>HW: 2
> 3. follower-A performs fetch and updated it's LogEndOffset to 4. IIUC, the 
> request received at 2. can be considered as succeeded ATM since it requires 
> only 2 out of 3 replicas are in sync(min.insync.replicas=2, acks=all), but 
> it's not with current implementation because of HW(ref: 
> https://github.com/apache/kafka/blob/1fbe445dde71df0023a978c5e54dd229d3d23e1b/core/src/main/scala/kafka/cluster/Partition.scala#L325).
>ISR(=LogEndOffset): leader=4, follower-A=4, follower-B=2
>HW: 2
> 4. By some reason, follower-B couldn't perform fetch for a while. ATM 
> follower-B still in ISR because of replica.lag.time.max.ms, meaning it still 
> affects HW.
>ISR(=LogEndOffset): leader=4, follower-A=4, follower-B=2
>HW: 2
> 5. By timeout the produce request received at 2. considered as failed and 
> client retries. Any incomming requests for T-1 will never succeed during this 
> moment.
>ISR(=LogEndOffset): leader=4, follower-A=4, follower-B=2
>HW: 2
> 6. The leader decides to abandon follower-B from ISR because of 
> replica.lag.time.max.ms. HW increased to 4 and all produce requests can now 
> successfully processed.
>ISR(=LogEndOffset): leader=4, follower-A=4
>HW: 4
> 7. After a while follower-B came back and caught up until the LogEndOffset so 
> the leader let him in to ISR again. The situation goes back to 1., continues 
> again.
> So here I understand that records on a producer are accumulated to single 
> batch while the produce request for the T-1 blocked(and retried) during 4-6 
> and that's why the total number of requests decreased significantly on 
> broker-1 while the total number of messages hasn't changed.
> As I commented on 3., the leader should consider a produce request succeeded 
> after it confirms min.insync.replicas's number of acks, so the current 
> implementation which makes produce requests dependent to HW isn't makes sense 
> IMO.
> When I confirmed this scenario our Kafka cluster used version 

[jira] [Commented] (KAFKA-3471) min.insync.replicas isn't respected when there's a delaying follower who still in ISR

2016-03-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3471?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15213133#comment-15213133
 ] 

ASF GitHub Bot commented on KAFKA-3471:
---

GitHub user kawamuray opened a pull request:

https://github.com/apache/kafka/pull/1146

KAFKA-3471: min.insync.replicas isn't respected when there's a delaying 
follower who still in ISR

Ticket: https://issues.apache.org/jira/browse/KAFKA-3471

The number of followers which are already caught up until requiredOffset 
should be used instead of high watermark to consider whether there are enough 
number of replicas for a produce request.
Please see the ticket for the detail.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/kawamuray/kafka issue/KAFKA-3471-minISR

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/1146.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1146


commit a784340b3876377894db25987659408779fec7dd
Author: Yuto Kawamura 
Date:   2016-03-26T17:14:36Z

KAFKA-3471: Add tests for Partition.checkEnoughReplicasReachOffset

At the moment of this commit, some of test cases fails but that is expected.
The next commit will follow up to fix checkEnoughReplicasReachOffset.

commit cc96ab952165afc4652ae628e5489c911b755ab6
Author: Yuto Kawamura 
Date:   2016-03-26T17:18:32Z

KAFKA-3471: Fix checkEnoughReplicasReachOffset to respect 
min.insync.replicas

The number of followers which are already caught up until requiredOffset 
should be
used instead of high watermark to consider whether there are enough number 
of
replicas for a produce request.
Please see the ticket for the detail.




> min.insync.replicas isn't respected when there's a delaying follower who 
> still in ISR
> -
>
> Key: KAFKA-3471
> URL: https://issues.apache.org/jira/browse/KAFKA-3471
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.9.0.1
>Reporter: Yuto Kawamura
>
> h2. tl;dr;
> Partition.checkEnoughReplicasReachOffset should see the number of followers 
> which are already caught up until requiredOffset instead of high watermark to 
> consider whether there are enough number of replicas for a produce request.
> h2. Description
> Just recently I found an interesting metric on our Kafka cluster.
> During the peak time, the number of produce requests significantly decreased 
> only on single broker. Let's say this broker's id=1.
> - broker-1 holds leadership for 3 partitions of the topic T.
> - For each producer they configured to have acks=all.
> - broker-1 contains some topics and each topic is configured to have 3 
> replicas, and min.insync.replicas is configured to 2.
> - For the partition 1 of topic T(T-1) there are 3 replicas namely: 
> broker-1(leader), broker-2(follower-A), broker-3(follower-B).
> When I see the logs of broker-1, there was lot's of logs indicating ISR 
> expand and shrink happening frequently for T-1.
> After investigating a while, we restarted broker-1 and unexpectedly 
> continuous ISR expand/shrink had gone.
> Since it is highly likely a state corruption issue(because it's fixed by a 
> simple restart) and it's never reproduced after a broker restart,
> unfortunately, but we lost clue to understand what was happening actually so 
> until today I'm not knowing the cause of this phenomenon.
> By the way we continued investigating why frequent ISR shrink/expand causes 
> reduction of the number of produce requests and found that kafka broker isn't 
> likely respecting min.insync.replicas as the document of this config 
> describes.
> Here's the scenario:
> 1. Everything working well.
>ISR(=LogEndOffset): leader=3, follower-A=2, follower-B=2
>HW: 2
> 2. Producer client produces some records. For simplicity it contains only one 
> record so the LogEndOffset is updated to 4, and the request will put into 
> purgatory since it has requiredOffset=4 while HW stay in 2.
>ISR(=LogEndOffset): leader=4, follower-A=2, follower-B=2
>HW: 2
> 3. follower-A performs fetch and updated it's LogEndOffset to 4. IIUC, the 
> request received at 2. can be considered as succeeded ATM since it requires 
> only 2 out of 3 replicas are in sync(min.insync.replicas=2, acks=all), but 
> it's not with current implementation because of HW(ref: 
> https://github.com/apache/kafka/blob/1fbe445dde71df0023a978c5e54dd229d3d23e1b/core/src/main/scala/kafka/cluster/Partition.scala#L325).
>ISR(=LogEndOffset): leader=4, follower-A=4, follower-B=2
>HW: 2
> 4. By