[jira] [Commented] (KAFKA-9987) Improve sticky partition assignor algorithm

2020-05-31 Thread Hai Lin (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9987?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17120656#comment-17120656
 ] 

Hai Lin commented on KAFKA-9987:


Thanks [~ableegoldman], will take a look at the PR. You have any time frame of 
next 2.4 release?

> Improve sticky partition assignor algorithm
> ---
>
> Key: KAFKA-9987
> URL: https://issues.apache.org/jira/browse/KAFKA-9987
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Reporter: Sophie Blee-Goldman
>Assignee: Sophie Blee-Goldman
>Priority: Major
>
> In 
> [KIP-429|https://cwiki.apache.org/confluence/display/KAFKA/KIP-429%3A+Kafka+Consumer+Incremental+Rebalance+Protocol]
>  we added the new CooperativeStickyAssignor which leverages on the underlying 
> sticky assignment algorithm of the existing StickyAssignor (moved to 
> AbstractStickyAssignor). The algorithm is fairly complex as it tries to 
> optimize stickiness while satisfying perfect balance _in the case individual 
> consumers may be subscribed to different subsets of the topics._ While it 
> does a pretty good job at what it promises to do, it doesn't scale well with 
> large numbers of consumers and partitions.
> To give a concrete example, users have reported that it takes 2.5 minutes for 
> the assignment to complete with just 2100 consumers reading from 2100 
> partitions. Since partitions revoked during the first of two cooperative 
> rebalances will remain unassigned until the end of the second rebalance, it's 
> important for the rebalance to be as fast as possible. And since one of the 
> primary improvements of the cooperative rebalancing protocol is better 
> scaling experience, the only OOTB cooperative assignor should not itself 
> scale poorly
> If we can constrain the problem a bit, we can simplify the algorithm greatly. 
> In many cases the individual consumers won't be subscribed to some random 
> subset of the total subscription, they will all be subscribed to the same set 
> of topics and rely on the assignor to balance the partition workload.
> We can detect this case by checking the group's individual subscriptions and 
> call on a more efficient assignment algorithm. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10056) Consumer metadata may use outdated groupSubscription that doesn't contain newly subscribed topics

2020-05-29 Thread Hai Lin (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10056?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17119775#comment-17119775
 ] 

Hai Lin commented on KAFKA-10056:
-

Thanks, much appreciated. 

> Consumer metadata may use outdated groupSubscription that doesn't contain 
> newly subscribed topics
> -
>
> Key: KAFKA-10056
> URL: https://issues.apache.org/jira/browse/KAFKA-10056
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 2.5.0
>Reporter: Rajini Sivaram
>Assignee: Rajini Sivaram
>Priority: Major
> Fix For: 2.6.0, 2.5.1
>
>
> From [~hai_lin] in KAFKA-9181:
> I did notice some issue after this patch, here is what I observe.
> Consumer metadata might skip first metadata update, cause grouopSubscription 
> is not reset. In my case, the consumer coordinator thread hijack the update 
> by calling newMetadataRequestAndVersion with outdated groupSubscription 
> before joinPrepare() happen. The groupSubscription will get reset later and 
> it will eventually get update later, and this won't be an issue for initial 
> consumer subscribe(since the groupSubscription is empty anyway), but it might 
> happen the following subscribe when groupSubscription is not empty. This will 
> create a discrepancy between subscription and groupSubscription, if any new 
> metadata request happened in between, metadataTopics will return outdated 
> group information. 
>  
> h4. The happy path
>  * Consumer call subscribe > Update {{needUpdated}}, bump up 
> {{requestVersion}} and update {{subscription}} in {{SubscriptionState}} > 
> {{prepareJoin()}} was call in first {{poll()}} to reset {{groupSubscription}} 
> -> next time when metadata update was call and {{metadataTopics()}} returns 
> {{subscription}} since {{groupSubscription}} is empty -> update call issue to 
> broker to fetch partition information for new topic
> h4. In our case
>  * Consumer call subscribe > Update {{needUpdated}}, bump up 
> {{requestVersion}} and update {{subscription}}(not {{groupSubscription}}) in 
> {{SubscriptionState}} > Consumer Coordinator heartbeat thread call metadata 
> request and {{SubscriptionState}} gave away the current requestVersion and 
> outdated {{groupSubscription}} > making request for metadata update with 
> outdated subscription -> request comes back to client and since 
> {{requestVersion}} is up to latest, it reset {{needUpdated}} flag -> 
> {{joinPrepare()}} called and reset {{groupSubscription}} > no new metadata 
> update request follow cause {{needUpdated}} was reset -> metadata request 
> will happen when {{metadata.max.age}} reaches.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10056) Consumer metadata may use outdated groupSubscription that doesn't contain newly subscribed topics

2020-05-28 Thread Hai Lin (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10056?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17119015#comment-17119015
 ] 

Hai Lin commented on KAFKA-10056:
-

Thanks for the patch [~rsivaram], are we going to back port this to 2.4? I was 
running 2.4.1 when I ran into this issue.

> Consumer metadata may use outdated groupSubscription that doesn't contain 
> newly subscribed topics
> -
>
> Key: KAFKA-10056
> URL: https://issues.apache.org/jira/browse/KAFKA-10056
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 2.5.0
>Reporter: Rajini Sivaram
>Assignee: Rajini Sivaram
>Priority: Major
> Fix For: 2.6.0, 2.5.1
>
>
> From [~hai_lin] in KAFKA-9181:
> I did notice some issue after this patch, here is what I observe.
> Consumer metadata might skip first metadata update, cause grouopSubscription 
> is not reset. In my case, the consumer coordinator thread hijack the update 
> by calling newMetadataRequestAndVersion with outdated groupSubscription 
> before joinPrepare() happen. The groupSubscription will get reset later and 
> it will eventually get update later, and this won't be an issue for initial 
> consumer subscribe(since the groupSubscription is empty anyway), but it might 
> happen the following subscribe when groupSubscription is not empty. This will 
> create a discrepancy between subscription and groupSubscription, if any new 
> metadata request happened in between, metadataTopics will return outdated 
> group information. 
>  
> h4. The happy path
>  * Consumer call subscribe > Update {{needUpdated}}, bump up 
> {{requestVersion}} and update {{subscription}} in {{SubscriptionState}} > 
> {{prepareJoin()}} was call in first {{poll()}} to reset {{groupSubscription}} 
> -> next time when metadata update was call and {{metadataTopics()}} returns 
> {{subscription}} since {{groupSubscription}} is empty -> update call issue to 
> broker to fetch partition information for new topic
> h4. In our case
>  * Consumer call subscribe > Update {{needUpdated}}, bump up 
> {{requestVersion}} and update {{subscription}}(not {{groupSubscription}}) in 
> {{SubscriptionState}} > Consumer Coordinator heartbeat thread call metadata 
> request and {{SubscriptionState}} gave away the current requestVersion and 
> outdated {{groupSubscription}} > making request for metadata update with 
> outdated subscription -> request comes back to client and since 
> {{requestVersion}} is up to latest, it reset {{needUpdated}} flag -> 
> {{joinPrepare()}} called and reset {{groupSubscription}} > no new metadata 
> update request follow cause {{needUpdated}} was reset -> metadata request 
> will happen when {{metadata.max.age}} reaches.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-4084) automated leader rebalance causes replication downtime for clusters with too many partitions

2020-05-20 Thread Hai Lin (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-4084?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17112508#comment-17112508
 ] 

Hai Lin edited comment on KAFKA-4084 at 5/20/20, 6:18 PM:
--

Thanks [~sql_consulting] point me to this ticket. [~junrao] Just want to hear a 
bit more about why KIP-491 is not in consideration based on your comment above.

I am not sure if KIP-491 is necessarily the best approach to address this 
particular issue (in general, one probably shouldn't have any broker overloaded 
at any time). However, if there are other convincing use cases, we could 
consider it.

I feel it's a very useful feature for a lot of operation cases:

 

1. For high replica rate when broker boot up:

To me uneven size of partition on production is very command, with throttle 
some big partitions will take much longer to get fully replicated. Sometimes we 
just want a fully replica broker(like in 10 minutes without replica rather than 
hours). A long time with under replica broker in the system add more complicity 
for operation. For example, we need to be careful there is no other broker is 
offline during the replicating process.

 

2 Other situation like outlier broker

This happen pretty often if the cluster is big, most of the time it's not 
easy(at least time consuming) to replace broker even with EBS. We would like to 
disable a broker as leader but not take it offline. So the on-call have time to 
investigate the problem without terminate it right away. With KIP-491 we can 
add a lot of automation to the system that handle some network partition for a 
single broker without actually replace it.

 

3 Potential

If we can manipulate the view of leader in a cluster, we can do a bit more like 
introduce different leader for producer and consumer(consumer now can consumer 
from replica but I think there is still way we can control it). Then we can add 
priority to the client level and isolate client to talk only some of the 
brokers. 

 

This is more for KIP-491, we can surely move it back to the original ticket if 
we feel there is more discussion for this.

 


was (Author: hai_lin):
Thanks [~sql_consulting] point me to this ticket. [~junrao] Just want to hear a 
bit more about why KIP-491 is not in consideration based on your comment above.

{*quote*}I am not sure if KIP-491 is necessarily the best approach to address 
this particular issue (in general, one probably shouldn't have any broker 
overloaded at any time). However, if there are other convincing use cases, we 
could consider it. {*quote*}

I feel it's a very useful feature for a lot of operation cases:

 

1. For high replica rate when broker boot up:

To me uneven size of partition on production is very command, with throttle 
some big partitions will take much longer to get fully replicated. Sometimes we 
just want a fully replica broker(like in 10 minutes without replica rather than 
hours). A long time with under replica broker in the system add more complicity 
for operation. For example, we need to be careful there is no other broker is 
offline during the replicating process.

 

2 Other situation like outlier broker

This happen pretty often if the cluster is big, most of the time it's not 
easy(at least time consuming) to replace broker even with EBS. We would like to 
disable a broker as leader but not take it offline. So the on-call have time to 
investigate the problem without terminate it right away. With KIP-491 we can 
add a lot of automation to the system that handle some network partition for a 
single broker without actually replace it.

 

3 Potential

If we can manipulate the view of leader in a cluster, we can do a bit more like 
introduce different leader for producer and consumer(consumer now can consumer 
from replica but I think there is still way we can control it). Then we can add 
priority to the client level and isolate client to talk only some of the 
brokers. 

 

This is more for KIP-491, we can surely move it back to the original ticket if 
we feel there is more discussion for this.

 

> automated leader rebalance causes replication downtime for clusters with too 
> many partitions
> 
>
> Key: KAFKA-4084
> URL: https://issues.apache.org/jira/browse/KAFKA-4084
> Project: Kafka
>  Issue Type: Bug
>  Components: controller
>Affects Versions: 0.8.2.2, 0.9.0.0, 0.9.0.1, 0.10.0.0, 0.10.0.1
>Reporter: Tom Crayford
>Priority: Major
>  Labels: reliability
> Fix For: 1.1.0
>
>
> If you enable {{auto.leader.rebalance.enable}} (which is on by default), and 
> you have a cluster with many partitions, there is a severe amount of 
> replication downtime following a restart. This causes 
> `UnderReplicatedPartitions` 

[jira] [Comment Edited] (KAFKA-4084) automated leader rebalance causes replication downtime for clusters with too many partitions

2020-05-20 Thread Hai Lin (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-4084?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17112508#comment-17112508
 ] 

Hai Lin edited comment on KAFKA-4084 at 5/20/20, 6:18 PM:
--

Thanks [~sql_consulting] point me to this ticket. [~junrao] Just want to hear a 
bit more about why KIP-491 is not in consideration based on your comment above.

{*quote*}I am not sure if KIP-491 is necessarily the best approach to address 
this particular issue (in general, one probably shouldn't have any broker 
overloaded at any time). However, if there are other convincing use cases, we 
could consider it. {*quote*}

I feel it's a very useful feature for a lot of operation cases:

 

1. For high replica rate when broker boot up:

To me uneven size of partition on production is very command, with throttle 
some big partitions will take much longer to get fully replicated. Sometimes we 
just want a fully replica broker(like in 10 minutes without replica rather than 
hours). A long time with under replica broker in the system add more complicity 
for operation. For example, we need to be careful there is no other broker is 
offline during the replicating process.

 

2 Other situation like outlier broker

This happen pretty often if the cluster is big, most of the time it's not 
easy(at least time consuming) to replace broker even with EBS. We would like to 
disable a broker as leader but not take it offline. So the on-call have time to 
investigate the problem without terminate it right away. With KIP-491 we can 
add a lot of automation to the system that handle some network partition for a 
single broker without actually replace it.

 

3 Potential

If we can manipulate the view of leader in a cluster, we can do a bit more like 
introduce different leader for producer and consumer(consumer now can consumer 
from replica but I think there is still way we can control it). Then we can add 
priority to the client level and isolate client to talk only some of the 
brokers. 

 

This is more for KIP-491, we can surely move it back to the original ticket if 
we feel there is more discussion for this.

 


was (Author: hai_lin):
Thanks [~sql_consulting] point me to this ticket. [~junrao] Just want to hear a 
bit more about why KIP-491 is not in consideration based on your comment above.

{*quote*}

I am not sure if KIP-491 is necessarily the best approach to address this 
particular issue (in general, one probably shouldn't have any broker overloaded 
at any time). However, if there are other convincing use cases, we could 
consider it. 

{*quote*}

I feel it's a very useful feature for a lot of operation cases:

 

1. For high replica rate when broker boot up:

To me uneven size of partition on production is very command, with throttle 
some big partitions will take much longer to get fully replicated. Sometimes we 
just want a fully replica broker(like in 10 minutes without replica rather than 
hours). A long time with under replica broker in the system add more complicity 
for operation. For example, we need to be careful there is no other broker is 
offline during the replicating process.

 

2 Other situation like outlier broker

This happen pretty often if the cluster is big, most of the time it's not 
easy(at least time consuming) to replace broker even with EBS. We would like to 
disable a broker as leader but not take it offline. So the on-call have time to 
investigate the problem without terminate it right away. With KIP-491 we can 
add a lot of automation to the system that handle some network partition for a 
single broker without actually replace it.

 

3 Potential

If we can manipulate the view of leader in a cluster, we can do a bit more like 
introduce different leader for producer and consumer(consumer now can consumer 
from replica but I think there is still way we can control it). Then we can add 
priority to the client level and isolate client to talk only some of the 
brokers. 

 

This is more for KIP-491, we can surely move it back to the original ticket if 
we feel there is more discussion for this.

 

> automated leader rebalance causes replication downtime for clusters with too 
> many partitions
> 
>
> Key: KAFKA-4084
> URL: https://issues.apache.org/jira/browse/KAFKA-4084
> Project: Kafka
>  Issue Type: Bug
>  Components: controller
>Affects Versions: 0.8.2.2, 0.9.0.0, 0.9.0.1, 0.10.0.0, 0.10.0.1
>Reporter: Tom Crayford
>Priority: Major
>  Labels: reliability
> Fix For: 1.1.0
>
>
> If you enable {{auto.leader.rebalance.enable}} (which is on by default), and 
> you have a cluster with many partitions, there is a severe amount of 
> replication downtime following a restart. This causes 
> `Unde

[jira] [Commented] (KAFKA-4084) automated leader rebalance causes replication downtime for clusters with too many partitions

2020-05-20 Thread Hai Lin (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-4084?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17112508#comment-17112508
 ] 

Hai Lin commented on KAFKA-4084:


Thanks [~sql_consulting] point me to this ticket. [~junrao] Just want to hear a 
bit more about why KIP-491 is not in consideration based on your comment above.

{*quote*}

I am not sure if KIP-491 is necessarily the best approach to address this 
particular issue (in general, one probably shouldn't have any broker overloaded 
at any time). However, if there are other convincing use cases, we could 
consider it. 

{*quote*}

I feel it's a very useful feature for a lot of operation cases:

 

1. For high replica rate when broker boot up:

To me uneven size of partition on production is very command, with throttle 
some big partitions will take much longer to get fully replicated. Sometimes we 
just want a fully replica broker(like in 10 minutes without replica rather than 
hours). A long time with under replica broker in the system add more complicity 
for operation. For example, we need to be careful there is no other broker is 
offline during the replicating process.

 

2 Other situation like outlier broker

This happen pretty often if the cluster is big, most of the time it's not 
easy(at least time consuming) to replace broker even with EBS. We would like to 
disable a broker as leader but not take it offline. So the on-call have time to 
investigate the problem without terminate it right away. With KIP-491 we can 
add a lot of automation to the system that handle some network partition for a 
single broker without actually replace it.

 

3 Potential

If we can manipulate the view of leader in a cluster, we can do a bit more like 
introduce different leader for producer and consumer(consumer now can consumer 
from replica but I think there is still way we can control it). Then we can add 
priority to the client level and isolate client to talk only some of the 
brokers. 

 

This is more for KIP-491, we can surely move it back to the original ticket if 
we feel there is more discussion for this.

 

> automated leader rebalance causes replication downtime for clusters with too 
> many partitions
> 
>
> Key: KAFKA-4084
> URL: https://issues.apache.org/jira/browse/KAFKA-4084
> Project: Kafka
>  Issue Type: Bug
>  Components: controller
>Affects Versions: 0.8.2.2, 0.9.0.0, 0.9.0.1, 0.10.0.0, 0.10.0.1
>Reporter: Tom Crayford
>Priority: Major
>  Labels: reliability
> Fix For: 1.1.0
>
>
> If you enable {{auto.leader.rebalance.enable}} (which is on by default), and 
> you have a cluster with many partitions, there is a severe amount of 
> replication downtime following a restart. This causes 
> `UnderReplicatedPartitions` to fire, and replication is paused.
> This is because the current automated leader rebalance mechanism changes 
> leaders for *all* imbalanced partitions at once, instead of doing it 
> gradually. This effectively stops all replica fetchers in the cluster 
> (assuming there are enough imbalanced partitions), and restarts them. This 
> can take minutes on busy clusters, during which no replication is happening 
> and user data is at risk. Clients with {{acks=-1}} also see issues at this 
> time, because replication is effectively stalled.
> To quote Todd Palino from the mailing list:
> bq. There is an admin CLI command to trigger the preferred replica election 
> manually. There is also a broker configuration “auto.leader.rebalance.enable” 
> which you can set to have the broker automatically perform the PLE when 
> needed. DO NOT USE THIS OPTION. There are serious performance issues when 
> doing so, especially on larger clusters. It needs some development work that 
> has not been fully identified yet.
> This setting is extremely useful for smaller clusters, but with high 
> partition counts causes the huge issues stated above.
> One potential fix could be adding a new configuration for the number of 
> partitions to do automated leader rebalancing for at once, and *stop* once 
> that number of leader rebalances are in flight, until they're done. There may 
> be better mechanisms, and I'd love to hear if anybody has any ideas.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-8638) Preferred Leader Blacklist (deprioritized list)

2020-05-19 Thread Hai Lin (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8638?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17111427#comment-17111427
 ] 

Hai Lin commented on KAFKA-8638:


What's the current status is this kip? I see this will be very useful when we 
do any operation on the cluster. Like during rolling restart, first blacklist 
the broker and bounce it, until it got fully replicated, re enable it again.  
Most of the time, when a broker come up after a restart, some small partition 
will get synced faster than those big ones. And the broker is under a lot of 
stressed when replicating for those big partitions, but small partition become 
leader and the performance is compromised. I think even a simple blacklist 
white-list would be very helpful.

> Preferred Leader Blacklist (deprioritized list)
> ---
>
> Key: KAFKA-8638
> URL: https://issues.apache.org/jira/browse/KAFKA-8638
> Project: Kafka
>  Issue Type: Improvement
>  Components: config, controller, core
>Affects Versions: 1.1.1, 2.3.0, 2.2.1
>Reporter: GEORGE LI
>Assignee: GEORGE LI
>Priority: Major
>
> Currently, the kafka preferred leader election will pick the broker_id in the 
> topic/partition replica assignments in a priority order when the broker is in 
> ISR. The preferred leader is the broker id in the first position of replica. 
> There are use-cases that, even the first broker in the replica assignment is 
> in ISR, there is a need for it to be moved to the end of ordering (lowest 
> priority) when deciding leadership during preferred leader election.
> Let’s use topic/partition replica (1,2,3) as an example. 1 is the preferred 
> leader. When preferred leadership is run, it will pick 1 as the leader if 
> it's ISR, if 1 is not online and in ISR, then pick 2, if 2 is not in ISR, 
> then pick 3 as the leader. There are use cases that, even 1 is in ISR, we 
> would like it to be moved to the end of ordering (lowest priority) when 
> deciding leadership during preferred leader election. Below is a list of use 
> cases:
>  * (If broker_id 1 is a swapped failed host and brought up with last segments 
> or latest offset without historical data (There is another effort on this), 
> it's better for it to not serve leadership till it's caught-up.
>  * The cross-data center cluster has AWS instances which have less computing 
> power than the on-prem bare metal machines. We could put the AWS broker_ids 
> in Preferred Leader Blacklist, so on-prem brokers can be elected leaders, 
> without changing the reassignments ordering of the replicas.
>  * If the broker_id 1 is constantly losing leadership after some time: 
> "Flapping". we would want to exclude 1 to be a leader unless all other 
> brokers of this topic/partition are offline. The “Flapping” effect was seen 
> in the past when 2 or more brokers were bad, when they lost leadership 
> constantly/quickly, the sets of partition replicas they belong to will see 
> leadership constantly changing. The ultimate solution is to swap these bad 
> hosts. But for quick mitigation, we can also put the bad hosts in the 
> Preferred Leader Blacklist to move the priority of its being elected as 
> leaders to the lowest.
>  * If the controller is busy serving an extra load of metadata requests and 
> other tasks. we would like to put the controller's leaders to other brokers 
> to lower its CPU load. currently bouncing to lose leadership would not work 
> for Controller, because after the bounce, the controller fails over to 
> another broker.
>  * Avoid bouncing broker in order to lose its leadership: it would be good if 
> we have a way to specify which broker should be excluded from serving 
> traffic/leadership (without changing the replica assignment ordering by 
> reassignments, even though that's quick), and run preferred leader election. 
> A bouncing broker will cause temporary URP, and sometimes other issues. Also 
> a bouncing of broker (e.g. broker_id 1) can temporarily lose all its 
> leadership, but if another broker (e.g. broker_id 2) fails or gets bounced, 
> some of its leaderships will likely failover to broker_id 1 on a replica with 
> 3 brokers. If broker_id 1 is in the blacklist, then in such a scenario even 
> broker_id 2 offline, the 3rd broker can take leadership.
> The current work-around of the above is to change the topic/partition's 
> replica reassignments to move the broker_id 1 from the first position to the 
> last position and run preferred leader election. e.g. (1, 2, 3) => (2, 3, 1). 
> This changes the replica reassignments, and we need to keep track of the 
> original one and restore if things change (e.g. controller fails over to 
> another broker, the swapped empty broker caught up). That’s a rather tedious 
> task.
> KIP is located at 
> [KIP-491|https://cwiki.ap

[jira] [Commented] (KAFKA-9987) Improve sticky partition assignor algorithm

2020-05-19 Thread Hai Lin (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9987?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17111361#comment-17111361
 ] 

Hai Lin commented on KAFKA-9987:


[~ableegoldman] do you think the pull request is ready to merge or I can apply 
your patch locally and do some test?

> Improve sticky partition assignor algorithm
> ---
>
> Key: KAFKA-9987
> URL: https://issues.apache.org/jira/browse/KAFKA-9987
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Reporter: Sophie Blee-Goldman
>Assignee: Sophie Blee-Goldman
>Priority: Major
>
> In 
> [KIP-429|https://cwiki.apache.org/confluence/display/KAFKA/KIP-429%3A+Kafka+Consumer+Incremental+Rebalance+Protocol]
>  we added the new CooperativeStickyAssignor which leverages on the underlying 
> sticky assignment algorithm of the existing StickyAssignor (moved to 
> AbstractStickyAssignor). The algorithm is fairly complex as it tries to 
> optimize stickiness while satisfying perfect balance _in the case individual 
> consumers may be subscribed to different subsets of the topics._ While it 
> does a pretty good job at what it promises to do, it doesn't scale well with 
> large numbers of consumers and partitions.
> To give a concrete example, users have reported that it takes 2.5 minutes for 
> the assignment to complete with just 2100 consumers reading from 2100 
> partitions. Since partitions revoked during the first of two cooperative 
> rebalances will remain unassigned until the end of the second rebalance, it's 
> important for the rebalance to be as fast as possible. And since one of the 
> primary improvements of the cooperative rebalancing protocol is better 
> scaling experience, the only OOTB cooperative assignor should not itself 
> scale poorly
> If we can constrain the problem a bit, we can simplify the algorithm greatly. 
> In many cases the individual consumers won't be subscribed to some random 
> subset of the total subscription, they will all be subscribed to the same set 
> of topics and rely on the assignor to balance the partition workload.
> We can detect this case by checking the group's individual subscriptions and 
> call on a more efficient assignment algorithm. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9987) Improve sticky partition assignor algorithm

2020-05-14 Thread Hai Lin (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9987?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17107584#comment-17107584
 ] 

Hai Lin commented on KAFKA-9987:


Thanks for putting this up. From the above algorithm, looks like we will 
introduce a cap per of partition assigned per rebalance. Can you hight light a 
bit the difference from existing algorithm, the only difference is the new C_f 
and C_c? So the existing one will assign everything to fill up all 
unfilled_members and all partitions right?

Also what's the time frame looks like for the implementation? Will it come with 
some benchmark to evaluate the performance with large consumer group(> 2k)? 
Will it come to 2.4 as a minor release? The reason we want it to upgrade from 
2.2 to 2.4 is to get a better performance for large consumer group(> 2k), and 
this(stableness of large consumer group) is been a pain point for us for a 
while.

 

> Improve sticky partition assignor algorithm
> ---
>
> Key: KAFKA-9987
> URL: https://issues.apache.org/jira/browse/KAFKA-9987
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Reporter: Sophie Blee-Goldman
>Assignee: Sophie Blee-Goldman
>Priority: Major
>
> In 
> [KIP-429|https://cwiki.apache.org/confluence/display/KAFKA/KIP-429%3A+Kafka+Consumer+Incremental+Rebalance+Protocol]
>  we added the new CooperativeStickyAssignor which leverages on the underlying 
> sticky assignment algorithm of the existing StickyAssignor (moved to 
> AbstractStickyAssignor). The algorithm is fairly complex as it tries to 
> optimize stickiness while satisfying perfect balance _in the case individual 
> consumers may be subscribed to different subsets of the topics._ While it 
> does a pretty good job at what it promises to do, it doesn't scale well with 
> large numbers of consumers and partitions.
> To give a concrete example, users have reported that it takes 2.5 minutes for 
> the assignment to complete with just 2100 consumers reading from 2100 
> partitions. Since partitions revoked during the first of two cooperative 
> rebalances will remain unassigned until the end of the second rebalance, it's 
> important for the rebalance to be as fast as possible. And since one of the 
> primary improvements of the cooperative rebalancing protocol is better 
> scaling experience, the only OOTB cooperative assignor should not itself 
> scale poorly
> If we can constrain the problem a bit, we can simplify the algorithm greatly. 
> In many cases the individual consumers won't be subscribed to some random 
> subset of the total subscription, they will all be subscribed to the same set 
> of topics and rely on the assignor to balance the partition workload.
> We can detect this case by checking the group's individual subscriptions and 
> call on a more efficient assignment algorithm. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9987) Improve sticky partition assignor algorithm

2020-05-14 Thread Hai Lin (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9987?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17107585#comment-17107585
 ] 

Hai Lin commented on KAFKA-9987:


Saw the PR, I will take a look.

> Improve sticky partition assignor algorithm
> ---
>
> Key: KAFKA-9987
> URL: https://issues.apache.org/jira/browse/KAFKA-9987
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Reporter: Sophie Blee-Goldman
>Assignee: Sophie Blee-Goldman
>Priority: Major
>
> In 
> [KIP-429|https://cwiki.apache.org/confluence/display/KAFKA/KIP-429%3A+Kafka+Consumer+Incremental+Rebalance+Protocol]
>  we added the new CooperativeStickyAssignor which leverages on the underlying 
> sticky assignment algorithm of the existing StickyAssignor (moved to 
> AbstractStickyAssignor). The algorithm is fairly complex as it tries to 
> optimize stickiness while satisfying perfect balance _in the case individual 
> consumers may be subscribed to different subsets of the topics._ While it 
> does a pretty good job at what it promises to do, it doesn't scale well with 
> large numbers of consumers and partitions.
> To give a concrete example, users have reported that it takes 2.5 minutes for 
> the assignment to complete with just 2100 consumers reading from 2100 
> partitions. Since partitions revoked during the first of two cooperative 
> rebalances will remain unassigned until the end of the second rebalance, it's 
> important for the rebalance to be as fast as possible. And since one of the 
> primary improvements of the cooperative rebalancing protocol is better 
> scaling experience, the only OOTB cooperative assignor should not itself 
> scale poorly
> If we can constrain the problem a bit, we can simplify the algorithm greatly. 
> In many cases the individual consumers won't be subscribed to some random 
> subset of the total subscription, they will all be subscribed to the same set 
> of topics and rely on the assignor to balance the partition workload.
> We can detect this case by checking the group's individual subscriptions and 
> call on a more efficient assignment algorithm. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-9181) Flaky test kafka.api.SaslGssapiSslEndToEndAuthorizationTest.testNoConsumeWithoutDescribeAclViaSubscribe

2020-04-30 Thread Hai Lin (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9181?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17096800#comment-17096800
 ] 

Hai Lin edited comment on KAFKA-9181 at 4/30/20, 5:37 PM:
--

I did notice some issue after this patch, here is what I observe.

Consumer metadata might skip first metadata update, cause grouopSubscription is 
not reset. In my case, the consumer coordinator thread hijack the update by 
calling newMetadataRequestAndVersion with outdated groupSubscription before 
joinPrepare() happen. The groupSubscription will get reset later and it will 
eventually get update later, and this won't be an issue for initial consumer 
subscribe(since the groupSubscription is empty anyway), but it might happen the 
following subscribe when groupSubscription is not empty. This will create a 
discrepancy between subscription and groupSubscription, if any new metadata 
request happened in between, metadataTopics will return outdated group 
information. 

 
h4. The happy path
 * Consumer call subscribe > Update {{needUpdated}}, bump up {{requestVersion}} 
and update {{subscription}} in {{SubscriptionState}} > {{prepareJoin()}} was 
call in first {{poll()}} to reset {{groupSubscription}} -> next time when 
metadata update was call and {{metadataTopics()}} returns {{subscription}} 
since {{groupSubscription}} is empty -> update call issue to broker to fetch 
partition information for new topic

h4. In our case
 * Consumer call subscribe > Update {{needUpdated}}, bump up {{requestVersion}} 
and update {{subscription}}(not {{groupSubscription}}) in {{SubscriptionState}} 
> Consumer Coordinator heartbeat thread call metadata request and 
{{SubscriptionState}} gave away the current requestVersion and outdated 
{{groupSubscription}} > making request for metadata update with outdated 
subscription -> request comes back to client and since {{requestVersion}} is up 
to latest, it reset {{needUpdated}} flag -> {{joinPrepare()}} called and reset 
{{groupSubscription}} > no new metadata update request follow cause 
{{needUpdated}} was reset -> metadata request will happen when 
{{metadata.max.age}} reaches.

 

I saw some discussion in the pull request, don't know if I miss anything here. 
cc [~rsivaram] [~bbejeck]


was (Author: hai_lin):
I did notice some issue after this patch, here is what I observe.

Consumer metadata might skip first metadata update, cause grouopSubscription is 
not reset. In my case, the consumer coordinator thread hijack the update by 
calling newMetadataRequestAndVersion with outdated groupSubscription before 
joinPrepare() happen. The groupSubscription will get reset later and it will 
eventually get update later, and this won't be an issue for initial consumer 
subscribe(since the groupSubscription is empty anyway), but it might happen the 
following subscribe when groupSubscription is not empty. This will create a 
discrepancy between subscription and groupSubscription, if any new metadata 
request happened in between, metadataTopics will return outdated group 
information. 

 
h4. The happy path
 * Consumer call subscribe > Update {{needUpdated}}, bump up {{requestVersion}} 
and update {{subscription}} in {{SubscriptionState}} > {{prepareJoin()}} was 
call in first {{poll()}} to reset {{groupSubscription}} -> next time when 
metadata update was call and {{metadataTopics()}} returns {{subscription}} 
since {{groupSubscription}} is empty -> update call issue to broker to fetch 
partition information for new topic

h4. In our case
 * Consumer call subscribe > Update {{needUpdated}}, bump up {{requestVersion}} 
and update {{subscription}}(not {{groupSubscription}}) in {{SubscriptionState}} 
> Consumer Coordinator heartbeat thread call metadata request and 
{{SubscriptionState}} gave away the current requestVersion and outdated 
{{groupSubscription}} > making request for metadata update with outdated 
subscription -> request comes back to client and since {{requestVersion}} is up 
to latest, it reset {{needUpdated}} flag -> {{joinPrepare()}} called and reset 
{{groupSubscription}} > no new metadata update request follow cause 
{{needUpdated}} was reset -> metadata request will happen when 
{{metadata.max.age}} reaches.

 

I saw some discussion in the pull request, don't know if I miss anything here.

> Flaky test 
> kafka.api.SaslGssapiSslEndToEndAuthorizationTest.testNoConsumeWithoutDescribeAclViaSubscribe
> ---
>
> Key: KAFKA-9181
> URL: https://issues.apache.org/jira/browse/KAFKA-9181
> Project: Kafka
>  Issue Type: Test
>  Components: core
>Reporter: Bill Bejeck
>Assignee: Rajini Sivaram
>Priority: Major
>  Labels: flaky-test, tests
> Fix For: 2.5.0
>
>
> Failed in 
> [ht

[jira] [Comment Edited] (KAFKA-9181) Flaky test kafka.api.SaslGssapiSslEndToEndAuthorizationTest.testNoConsumeWithoutDescribeAclViaSubscribe

2020-04-30 Thread Hai Lin (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9181?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17096800#comment-17096800
 ] 

Hai Lin edited comment on KAFKA-9181 at 4/30/20, 5:29 PM:
--

I did notice some issue after this patch, here is what I observe.

Consumer metadata might skip first metadata update, cause grouopSubscription is 
not reset. In my case, the consumer coordinator thread hijack the update by 
calling newMetadataRequestAndVersion with outdated groupSubscription before 
joinPrepare() happen. The groupSubscription will get reset later and it will 
eventually get update later, and this won't be an issue for initial consumer 
subscribe(since the groupSubscription is empty anyway), but it might happen the 
following subscribe when groupSubscription is not empty. This will create a 
discrepancy between subscription and groupSubscription, if any new metadata 
request happened in between, metadataTopics will return outdated group 
information. 

 
h4. The happy path
 * Consumer call subscribe > Update {{needUpdated}}, bump up {{requestVersion}} 
and update {{subscription}} in {{SubscriptionState}} > {{prepareJoin()}} was 
call in first {{poll()}} to reset {{groupSubscription}} -> next time when 
metadata update was call and {{metadataTopics()}} returns {{subscription}} 
since {{groupSubscription}} is empty -> update call issue to broker to fetch 
partition information for new topic

h4. In our case
 * Consumer call subscribe > Update {{needUpdated}}, bump up {{requestVersion}} 
and update {{subscription}}(not {{groupSubscription}}) in {{SubscriptionState}} 
> Consumer Coordinator heartbeat thread call metadata request and 
{{SubscriptionState}} gave away the current requestVersion and outdated 
{{groupSubscription}} > making request for metadata update with outdated 
subscription -> request comes back to client and since {{requestVersion}} is up 
to latest, it reset {{needUpdated}} flag -> {{joinPrepare()}} called and reset 
{{groupSubscription}} > no new metadata update request follow cause 
{{needUpdated}} was reset -> metadata request will happen when 
{{metadata.max.age}} reaches.

 

I saw some discussion in the pull request, don't know if I miss anything here.


was (Author: hai_lin):
I did notice some issue after this patch, here is what I observe.

Consumer metadata might skip first metadata update, cause grouopSubscription is 
not reset. In my case, the consumer coordinator thread hijack the update by 
calling newMetadataRequestAndVersion with outdated groupSubscription before 
joinPrepare() happen. The groupSubscription will get reset later and it will 
eventually get update later, and this won't be an issue for initial consumer 
subscribe(since the groupSubscription is empty anyway), but it might happen the 
following subscribe when groupSubscription is not empty. This will create a 
discrepancy between subscription and groupSubscription, if any new metadata 
request happened in between, metadataTopics will return outdated group 
information. 

 
h4. The happy path
 * Consumer call subscribe -> Update {{needUpdated}}, bump up 
{{requestVersion}} and update {{subscription}} in {{SubscriptionState}} -> 
{{prepareJoin()}} was call in first {{poll()}} to reset {{groupSubscription}} 
-> next time when metadata update was call and {{metadataTopics()}} returns 
{{subscription}} since {{groupSubscription}} is empty -> update call issue to 
broker to fetch partition information for new topic

h4. In our case
 * Consumer call subscribe -> Update {{needUpdated}}, bump up 
{{requestVersion}} and update {{subscription}}(not {{groupSubscription}}) in 
{{SubscriptionState}} -> Consumer Coordinator heartbeat thread call metadata 
request and {{SubscriptionState}} gave away the current requestVersion and 
outdated {{groupSubscription}} -> making request for metadata update with 
outdated subscription -> request comes back to client and since 
{{requestVersion}} is up to latest, it reset {{needUpdated}} flag -> 
{{joinPrepare()}} called and reset {{groupSubscription}} -> no new metadata 
update request follow cause {{needUpdated}} was reset -> metadata request will 
happen when {{metadata.max.age}} reaches.

 

I saw some discussion in the pull request, don't know if I miss anything here.

> Flaky test 
> kafka.api.SaslGssapiSslEndToEndAuthorizationTest.testNoConsumeWithoutDescribeAclViaSubscribe
> ---
>
> Key: KAFKA-9181
> URL: https://issues.apache.org/jira/browse/KAFKA-9181
> Project: Kafka
>  Issue Type: Test
>  Components: core
>Reporter: Bill Bejeck
>Assignee: Rajini Sivaram
>Priority: Major
>  Labels: flaky-test, tests
> Fix For: 2.5.0
>
>
> Failed in 
> [https://builds.apache.o

[jira] [Commented] (KAFKA-9181) Flaky test kafka.api.SaslGssapiSslEndToEndAuthorizationTest.testNoConsumeWithoutDescribeAclViaSubscribe

2020-04-30 Thread Hai Lin (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9181?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17096800#comment-17096800
 ] 

Hai Lin commented on KAFKA-9181:


I did notice some issue after this patch, here is what I observe.

Consumer metadata might skip first metadata update, cause grouopSubscription is 
not reset. In my case, the consumer coordinator thread hijack the update by 
calling newMetadataRequestAndVersion with outdated groupSubscription before 
joinPrepare() happen. The groupSubscription will get reset later and it will 
eventually get update later, and this won't be an issue for initial consumer 
subscribe(since the groupSubscription is empty anyway), but it might happen the 
following subscribe when groupSubscription is not empty. This will create a 
discrepancy between subscription and groupSubscription, if any new metadata 
request happened in between, metadataTopics will return outdated group 
information. 

 
h4. The happy path
 * Consumer call subscribe -> Update {{needUpdated}}, bump up 
{{requestVersion}} and update {{subscription}} in {{SubscriptionState}} -> 
{{prepareJoin()}} was call in first {{poll()}} to reset {{groupSubscription}} 
-> next time when metadata update was call and {{metadataTopics()}} returns 
{{subscription}} since {{groupSubscription}} is empty -> update call issue to 
broker to fetch partition information for new topic

h4. In our case
 * Consumer call subscribe -> Update {{needUpdated}}, bump up 
{{requestVersion}} and update {{subscription}}(not {{groupSubscription}}) in 
{{SubscriptionState}} -> Consumer Coordinator heartbeat thread call metadata 
request and {{SubscriptionState}} gave away the current requestVersion and 
outdated {{groupSubscription}} -> making request for metadata update with 
outdated subscription -> request comes back to client and since 
{{requestVersion}} is up to latest, it reset {{needUpdated}} flag -> 
{{joinPrepare()}} called and reset {{groupSubscription}} -> no new metadata 
update request follow cause {{needUpdated}} was reset -> metadata request will 
happen when {{metadata.max.age}} reaches.

 

I saw some discussion in the pull request, don't know if I miss anything here.

> Flaky test 
> kafka.api.SaslGssapiSslEndToEndAuthorizationTest.testNoConsumeWithoutDescribeAclViaSubscribe
> ---
>
> Key: KAFKA-9181
> URL: https://issues.apache.org/jira/browse/KAFKA-9181
> Project: Kafka
>  Issue Type: Test
>  Components: core
>Reporter: Bill Bejeck
>Assignee: Rajini Sivaram
>Priority: Major
>  Labels: flaky-test, tests
> Fix For: 2.5.0
>
>
> Failed in 
> [https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/26571/testReport/junit/kafka.api/SaslGssapiSslEndToEndAuthorizationTest/testNoConsumeWithoutDescribeAclViaSubscribe/]
>  
> {noformat}
> Error Messageorg.apache.kafka.common.errors.TopicAuthorizationException: Not 
> authorized to access topics: 
> [topic2]Stacktraceorg.apache.kafka.common.errors.TopicAuthorizationException: 
> Not authorized to access topics: [topic2]
> Standard OutputAdding ACLs for resource 
> `ResourcePattern(resourceType=CLUSTER, name=kafka-cluster, 
> patternType=LITERAL)`: 
>   (principal=User:kafka, host=*, operation=CLUSTER_ACTION, 
> permissionType=ALLOW) 
> Current ACLs for resource `Cluster:LITERAL:kafka-cluster`: 
>   User:kafka has Allow permission for operations: ClusterAction from 
> hosts: * 
> Adding ACLs for resource `ResourcePattern(resourceType=TOPIC, name=*, 
> patternType=LITERAL)`: 
>   (principal=User:kafka, host=*, operation=READ, permissionType=ALLOW) 
> Current ACLs for resource `Topic:LITERAL:*`: 
>   User:kafka has Allow permission for operations: Read from hosts: * 
> Debug is  true storeKey true useTicketCache false useKeyTab true doNotPrompt 
> false ticketCache is null isInitiator true KeyTab is 
> /tmp/kafka6494439724844851846.tmp refreshKrb5Config is false principal is 
> kafka/localh...@example.com tryFirstPass is false useFirstPass is false 
> storePass is false clearPass is false
> principal is kafka/localh...@example.com
> Will use keytab
> Commit Succeeded 
> [2019-11-13 04:43:16,187] ERROR [ReplicaFetcher replicaId=1, leaderId=0, 
> fetcherId=0] Error for partition __consumer_offsets-0 at offset 0 
> (kafka.server.ReplicaFetcherThread:76)
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition.
> [2019-11-13 04:43:16,191] ERROR [ReplicaFetcher replicaId=2, leaderId=0, 
> fetcherId=0] Error for partition __consumer_offsets-0 at offset 0 
> (kafka.server.ReplicaFetcherThread:76)
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition.
> [2019-