[jira] [Commented] (KAFKA-12463) Update default consumer partition assignor for sink tasks

2021-04-02 Thread Chris Egerton (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12463?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17314151#comment-17314151
 ] 

Chris Egerton commented on KAFKA-12463:
---

Looks like this may be accomplished by 
[KIP-726|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=177048248],
 which currently proposes that the default consumer partition assignment 
strategy be upgraded to {{["cooperative-sticky", "range"]}} for all consumer 
applications (including Connect).

> Update default consumer partition assignor for sink tasks
> -
>
> Key: KAFKA-12463
> URL: https://issues.apache.org/jira/browse/KAFKA-12463
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Chris Egerton
>Assignee: Chris Egerton
>Priority: Major
>  Labels: needs-kip
>
> Kafka consumers have a pluggable [partition assignment 
> interface|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.html]
>  that comes with several out-of-the-box implementations including the 
> [RangeAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/RangeAssignor.html],
>  
> [RoundRobinAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/RoundRobinAssignor.html],
>  
> [StickyAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/StickyAssignor.html],
>  and 
> [CooperativeStickyAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/CooperativeStickyAssignor.html].
> If no partition assignor is configured with a consumer, the {{RangeAssignor}} 
> is used by default. Although there are some benefits to this assignor 
> including stability of assignment across generations and simplicity of 
> design, it comes with a major drawback: the number of active consumers in a 
> group is limited to the number of partitions in the topic(s) with the most 
> partitions. For an example of the worst case, in a consumer group where every 
> member is subscribed to ten topics that each have one partition, only one 
> member of that group will be assigned any topic partitions.
> This can end up producing counterintuitive and even frustrating behavior when 
> a sink connector is brought up with N tasks to read from some collection of 
> topics with a total of N topic partitions, but some tasks end up idling and 
> not processing any data.
> h3. Proposed Change
> *NOTE: Until/unless KAFKA-12477 is addressed, the approach outlined below 
> will not work as consumers will still perform eager rebalancing as long as at 
> least one of the partition assignors they are configured with does not 
> support cooperative rebalancing. KAFKA-12487 should also be addressed before 
> configuring any Connect worker to use the {{CooperativeStickyAssignor}} for 
> any sink connectors.*
> [KIP-429|https://cwiki.apache.org/confluence/display/KAFKA/KIP-429%3A+Kafka+Consumer+Incremental+Rebalance+Protocol]
>  introduced the {{CooperativeStickyAssignor}}, which seeks to provide a 
> stable assignment across generations wherever possible, provide the most even 
> assignment possible (taking into account possible differences in 
> subscriptions across consumers in the group), and allow consumers to continue 
> processing data during rebalance. The documentation for the assignor states 
> that "Users should prefer this assignor for newer clusters." As Connect and 
> the tooling around it matures and automatic restarts of failed tasks become 
> more popular, care should be taken to ensure that the consumer group churn 
> created by restarting one or more tasks doesn't compromise the availability 
> of other tasks by forcing them to temporarily yield up all of their 
> partitions just to reclaim them after a rebalance has completed.
> With that in mind, we should alter the default consumer configuration for 
> sink tasks to use the new {{CooperativeStickyAssignor}}. In order to do this 
> in a backwards-compatible fashion that also enables rolling upgrades, this 
> should be implemented by changing the {{Worker}} to set the following on the 
> consumer configuration created for each sink connector task:
> {code:java}
> partition.assignment.strategy=org.apache.kafka.clients.consumer.CooperativeStickyAssignor,org.apache.kafka.clients.consumer.RangeAssignor
> {code}
> This way, consumer groups for sink connectors on Connect clusters in the 
> process of being upgraded will continue to use the {{RangeAssignor}} until 
> all workers in the cluster have been upgraded, and then will switch over to 
> the new {{CooperativeStickyAssignor}} automatically.
> Importantly, this setting will only be a default, and any user-specified 
> overrides either in the *worker config*:
>  
> {code:java}
>

[jira] [Commented] (KAFKA-12463) Update default consumer partition assignor for sink tasks

2021-03-24 Thread Chris Egerton (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12463?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17308235#comment-17308235
 ] 

Chris Egerton commented on KAFKA-12463:
---

Thanks [~kkonstantine]--I'll admit I was hoping to avoid a KIP as this seems 
like more of an implementation detail and not something most users would 
consider when using Connect, but the longer the conversation has gone on it's 
become clear that this is a measure-twice, cut-once situation with multiple 
perspectives and tradeoffs that need to be taken into account. Regardless of 
whether a KIP is strictly required or not, I certainly agree it'd be best at 
this point to proceed with one, so I'll add the {{needs-kip}} tag and will 
hopefully get a design doc drafted and on the mailing list within the next week 
or two.

> Update default consumer partition assignor for sink tasks
> -
>
> Key: KAFKA-12463
> URL: https://issues.apache.org/jira/browse/KAFKA-12463
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Chris Egerton
>Assignee: Chris Egerton
>Priority: Major
>  Labels: needs-kip
>
> Kafka consumers have a pluggable [partition assignment 
> interface|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.html]
>  that comes with several out-of-the-box implementations including the 
> [RangeAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/RangeAssignor.html],
>  
> [RoundRobinAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/RoundRobinAssignor.html],
>  
> [StickyAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/StickyAssignor.html],
>  and 
> [CooperativeStickyAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/CooperativeStickyAssignor.html].
> If no partition assignor is configured with a consumer, the {{RangeAssignor}} 
> is used by default. Although there are some benefits to this assignor 
> including stability of assignment across generations and simplicity of 
> design, it comes with a major drawback: the number of active consumers in a 
> group is limited to the number of partitions in the topic(s) with the most 
> partitions. For an example of the worst case, in a consumer group where every 
> member is subscribed to ten topics that each have one partition, only one 
> member of that group will be assigned any topic partitions.
> This can end up producing counterintuitive and even frustrating behavior when 
> a sink connector is brought up with N tasks to read from some collection of 
> topics with a total of N topic partitions, but some tasks end up idling and 
> not processing any data.
> h3. Proposed Change
> *NOTE: Until/unless KAFKA-12477 is addressed, the approach outlined below 
> will not work as consumers will still perform eager rebalancing as long as at 
> least one of the partition assignors they are configured with does not 
> support cooperative rebalancing. KAFKA-12487 should also be addressed before 
> configuring any Connect worker to use the {{CooperativeStickyAssignor}} for 
> any sink connectors.*
> [KIP-429|https://cwiki.apache.org/confluence/display/KAFKA/KIP-429%3A+Kafka+Consumer+Incremental+Rebalance+Protocol]
>  introduced the {{CooperativeStickyAssignor}}, which seeks to provide a 
> stable assignment across generations wherever possible, provide the most even 
> assignment possible (taking into account possible differences in 
> subscriptions across consumers in the group), and allow consumers to continue 
> processing data during rebalance. The documentation for the assignor states 
> that "Users should prefer this assignor for newer clusters." As Connect and 
> the tooling around it matures and automatic restarts of failed tasks become 
> more popular, care should be taken to ensure that the consumer group churn 
> created by restarting one or more tasks doesn't compromise the availability 
> of other tasks by forcing them to temporarily yield up all of their 
> partitions just to reclaim them after a rebalance has completed.
> With that in mind, we should alter the default consumer configuration for 
> sink tasks to use the new {{CooperativeStickyAssignor}}. In order to do this 
> in a backwards-compatible fashion that also enables rolling upgrades, this 
> should be implemented by changing the {{Worker}} to set the following on the 
> consumer configuration created for each sink connector task:
> {code:java}
> partition.assignment.strategy=org.apache.kafka.clients.consumer.CooperativeStickyAssignor,org.apache.kafka.clients.consumer.RangeAssignor
> {code}
> This way, consumer groups for sink connectors on Connect clusters in the 
> process of being upgraded will 

[jira] [Commented] (KAFKA-12463) Update default consumer partition assignor for sink tasks

2021-03-22 Thread Konstantine Karantasis (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12463?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17306798#comment-17306798
 ] 

Konstantine Karantasis commented on KAFKA-12463:


Long and interesting history of comments already and I admit I'll have to 
return soon to read more in detail. 

Just a meta comment for now: I think that changing the default assignor will be 
a noticeable change in behavior in several circumstances. This makes this a 
user facing change for users that run sink connectors, thus I think it'd be 
good to highlight this change with a brief KIP. 

What do you think [~ChrisEgerton] [~rhauch]?

> Update default consumer partition assignor for sink tasks
> -
>
> Key: KAFKA-12463
> URL: https://issues.apache.org/jira/browse/KAFKA-12463
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Chris Egerton
>Assignee: Chris Egerton
>Priority: Major
>
> Kafka consumers have a pluggable [partition assignment 
> interface|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.html]
>  that comes with several out-of-the-box implementations including the 
> [RangeAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/RangeAssignor.html],
>  
> [RoundRobinAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/RoundRobinAssignor.html],
>  
> [StickyAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/StickyAssignor.html],
>  and 
> [CooperativeStickyAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/CooperativeStickyAssignor.html].
> If no partition assignor is configured with a consumer, the {{RangeAssignor}} 
> is used by default. Although there are some benefits to this assignor 
> including stability of assignment across generations and simplicity of 
> design, it comes with a major drawback: the number of active consumers in a 
> group is limited to the number of partitions in the topic(s) with the most 
> partitions. For an example of the worst case, in a consumer group where every 
> member is subscribed to ten topics that each have one partition, only one 
> member of that group will be assigned any topic partitions.
> This can end up producing counterintuitive and even frustrating behavior when 
> a sink connector is brought up with N tasks to read from some collection of 
> topics with a total of N topic partitions, but some tasks end up idling and 
> not processing any data.
> h3. Proposed Change
> *NOTE: Until/unless KAFKA-12477 is addressed, the approach outlined below 
> will not work as consumers will still perform eager rebalancing as long as at 
> least one of the partition assignors they are configured with does not 
> support cooperative rebalancing. KAFKA-12487 should also be addressed before 
> configuring any Connect worker to use the {{CooperativeStickyAssignor}} for 
> any sink connectors.*
> [KIP-429|https://cwiki.apache.org/confluence/display/KAFKA/KIP-429%3A+Kafka+Consumer+Incremental+Rebalance+Protocol]
>  introduced the {{CooperativeStickyAssignor}}, which seeks to provide a 
> stable assignment across generations wherever possible, provide the most even 
> assignment possible (taking into account possible differences in 
> subscriptions across consumers in the group), and allow consumers to continue 
> processing data during rebalance. The documentation for the assignor states 
> that "Users should prefer this assignor for newer clusters." As Connect and 
> the tooling around it matures and automatic restarts of failed tasks become 
> more popular, care should be taken to ensure that the consumer group churn 
> created by restarting one or more tasks doesn't compromise the availability 
> of other tasks by forcing them to temporarily yield up all of their 
> partitions just to reclaim them after a rebalance has completed.
> With that in mind, we should alter the default consumer configuration for 
> sink tasks to use the new {{CooperativeStickyAssignor}}. In order to do this 
> in a backwards-compatible fashion that also enables rolling upgrades, this 
> should be implemented by changing the {{Worker}} to set the following on the 
> consumer configuration created for each sink connector task:
> {code:java}
> partition.assignment.strategy=org.apache.kafka.clients.consumer.CooperativeStickyAssignor,org.apache.kafka.clients.consumer.RangeAssignor
> {code}
> This way, consumer groups for sink connectors on Connect clusters in the 
> process of being upgraded will continue to use the {{RangeAssignor}} until 
> all workers in the cluster have been upgraded, and then will switch over to 
> the new {{CooperativeStickyAssignor}} automatically.
> Importantly, this se

[jira] [Commented] (KAFKA-12463) Update default consumer partition assignor for sink tasks

2021-03-18 Thread Chris Egerton (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12463?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17304519#comment-17304519
 ] 

Chris Egerton commented on KAFKA-12463:
---

Thanks Randall--
{quote}I agree with you that we should fix the behavior. But the fix will 
appear only in certain releases, and not all users will be able to upgrade to 
those releases to get the fix. So, documenting the simple workaround will help 
users in those situations. I suggested documenting the workaround here to help 
any users that do stumble upon this issue when searching for a potential fix, 
regardless of whether the workaround is also documented elsewhere.
{quote}
That's an interesting take... I guess the difference I perceive here is that 
this is a proposed improvement, not really a fix. I don't know if any users are 
going to stumble onto this after something breaks since it doesn't address 
anything breaking to begin with. Either way, I guess we can try to iron out the 
workaround a little more here (will do now) but hopefully we can put something 
in the public-facing docs for Connect (maybe as part of upgrade notes if we 
change the default consumer partition assignment strategy) in addition to that; 
seems like that might get more of the target audience here.

 

With regards to steps forward and your question, [~ableegoldman], I wasn't 
certain one way or the other about round robin vs cooperative sticky 
assignment. I had a few thoughts:
 * When the set of task configurations changes, the advantages of stickiness or 
the cooperative protocol are basically irrelevant since each task and its 
accompanying consumer is brought down and a new one is brought up in its place.
 * In a completely stable world where consumer assignment never changes, round 
robin would be ideal (out of the ones available right now) as it'd guarantee 
as-even-as-possible spread of partitions within the same topic across tasks.
 * Otherwise, the only times the cooperative protocol might come into play are:
 ** When a consumer subscription is updated (which would cause a consumer 
rebalance but keep all sink tasks running)
 ** When a task is started or shut down without the connector being 
reconfigured (which may happen when a single task fails, a failed task is 
restarted, a new worker joins the group, or an existing worker leaves the group)
 ** When the consumer for a task falls out of the group (likely because the 
task is taking too long to process data provided to it by the framework).
 * Under most of these scenarios, stickiness would provide no benefit as any 
time a consumer is created, a new task is brought up in its place. The only 
exception is an update to a consumer subscription, but even that would require 
some changes to how Connect invokes 
[SinkTask::open|https://kafka.apache.org/27/javadoc/org/apache/kafka/connect/sink/SinkTask.html#open-java.util.Collection-]
 and 
[SinkTask::close|https://kafka.apache.org/27/javadoc/org/apache/kafka/connect/sink/SinkTask.html#close-java.util.Collection-]
 to basically fake cooperative rebalancing in the way that's proposed in the 
[StickyAssignor 
docs|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/StickyAssignor.html]
 under the "Impact on {{ConsumerRebalanceListener}}" section.

The benefits of the sticky assignor seem pretty slim, and would require some 
one-off tooling from Connect to basically re-implement the cooperative 
protocol. This is why I'm personally not in favor of it, but would love to 
learn more if there's something I'm missing here.

So with all that in mind, we can transform the question into whether it's 
important to favor any of the scenarios outlined above where cooperative 
rebalancing might be of some benefit, and if not, opt to use the round robin 
assignor. There's one that comes to mind that I think might be worth 
considering, and count in favor of using the cooperative sticky assignor: if 
there's any kind of tooling in place that restarts failed tasks automatically, 
there will be significant consumer churn as consumers may rapidly fall out of 
and join the group.

I think this scenario is going to become more and more common as adoption of 
Connect increases and both it and the tooling around it mature, and as a 
result, I'm gently in favor of trying to use the {{CooperativeStickyAssignor}} 
now, or at least, when it becomes possible in the Connect framework once work 
on KAFKA-12477 and KAFKA-12487 completes.

I raised this point on the PR but it bears repeating here: we might want to 
reason through this carefully since we only get one shot to do this kind of 
automated upgrade before the next one gets more complicated (the list of 
assignors will grow from one to two this time around; it'll either have to grow 
from two to three the next, or we'll have to risk breaking changes for users 
who skip an upgrade step). I raised the round robin assign

[jira] [Commented] (KAFKA-12463) Update default consumer partition assignor for sink tasks

2021-03-18 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12463?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17304452#comment-17304452
 ] 

A. Sophie Blee-Goldman commented on KAFKA-12463:


Out of curiosity, why use the RoundRobinAssignor rather than the StickyAssignor 
if you want to avoid cooperative? I'm not a Connect expert so maybe there's 
some nuance here, but the StickyAssignor is generally preferred (we were 
planning to make that the default in 3.0 before the CooperativeStickyAssignor 
came along)

> Update default consumer partition assignor for sink tasks
> -
>
> Key: KAFKA-12463
> URL: https://issues.apache.org/jira/browse/KAFKA-12463
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Chris Egerton
>Assignee: Chris Egerton
>Priority: Major
>
> Kafka consumers have a pluggable [partition assignment 
> interface|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.html]
>  that comes with several out-of-the-box implementations including the 
> [RangeAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/RangeAssignor.html],
>  
> [RoundRobinAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/RoundRobinAssignor.html],
>  
> [StickyAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/StickyAssignor.html],
>  and 
> [CooperativeStickyAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/CooperativeStickyAssignor.html].
> If no partition assignor is configured with a consumer, the {{RangeAssignor}} 
> is used by default. Although there are some benefits to this assignor 
> including stability of assignment across generations and simplicity of 
> design, it comes with a major drawback: the number of active consumers in a 
> group is limited to the number of partitions in the topic(s) with the most 
> partitions. For an example of the worst case, in a consumer group where every 
> member is subscribed to ten topics that each have one partition, only one 
> member of that group will be assigned any topic partitions.
> This can end up producing counterintuitive and even frustrating behavior when 
> a sink connector is brought up with N tasks to read from some collection of 
> topics with a total of N topic partitions, but some tasks end up idling and 
> not processing any data.
> h3. Proposed Change
> *NOTE: Until/unless KAFKA-12477 is addressed, the approach outlined below 
> will not work as consumers will still perform eager rebalancing as long as at 
> least one of the partition assignors they are configured with does not 
> support cooperative rebalancing. KAFKA-12487 should also be addressed before 
> configuring any Connect worker to use the {{CooperativeStickyAssignor}} for 
> any sink connectors.*
> [KIP-429|https://cwiki.apache.org/confluence/display/KAFKA/KIP-429%3A+Kafka+Consumer+Incremental+Rebalance+Protocol]
>  introduced the {{CooperativeStickyAssignor}}, which seeks to provide a 
> stable assignment across generations wherever possible, provide the most even 
> assignment possible (taking into account possible differences in 
> subscriptions across consumers in the group), and allow consumers to continue 
> processing data during rebalance. The documentation for the assignor states 
> that "Users should prefer this assignor for newer clusters."
> We should alter the default consumer configuration for sink tasks to use the 
> new {{CooperativeStickyAssignor}}. In order to do this in a 
> backwards-compatible fashion that also enables rolling upgrades, this should 
> be implemented by changing the {{Worker}} to set the following on the 
> consumer configuration created for each sink connector task:
> {code:java}
> partition.assignment.strategy=org.apache.kafka.clients.consumer.CooperativeStickyAssignor,org.apache.kafka.clients.consumer.RangeAssignor
> {code}
> This way, consumer groups for sink connectors on Connect clusters in the 
> process of being upgraded will continue to use the {{RangeAssignor}} until 
> all workers in the cluster have been upgraded, and then will switch over to 
> the new {{CooperativeStickyAssignor}} automatically. But, this setting will 
> be overwritten by any user-specified 
> {{consumer.partition.assignment.strategy}} property in the worker 
> configuration, and by any user-specified 
> {{consumer.override.partition.assignment.strategy}} property in a sink 
> connector configuration when per-connector client overrides is enabled in the 
> worker config with {{connector.client.config.override.policy=ALL}}.
> This improvement is viable as far back as -2.3- 2.4, when the 
> {{CooperativeStickyAssignor}} was introduced, but given that it is not a bug 
> fi

[jira] [Commented] (KAFKA-12463) Update default consumer partition assignor for sink tasks

2021-03-18 Thread Randall Hauch (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12463?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17304248#comment-17304248
 ] 

Randall Hauch commented on KAFKA-12463:
---

Thanks for the response and logging KAFKA-12487, [~ChrisEgerton]. 

{quote}

I'd also just like to point out that the goal here is to improve the 
out-of-the-box behavior of Connect for users; although workarounds are nice to 
have, the goal here shouldn't be to focus on documenting them but instead, to 
make them obsolete. If we decide not to improve the default behavior of Connect 
then we can document this somewhere else that's a little more visible for users 
as opposed to developers. 

{quote}

I agree with you that we should fix the behavior. But the fix will appear only 
in certain releases, and not all users will be able to upgrade to those 
releases to get the fix. So, documenting the simple workaround will help users 
in those situations. I suggested documenting the workaround here to help any 
users that do stumble upon this issue when searching for a potential fix, 
regardless of whether the workaround is also documented elsewhere.

IIUC, given KAFKA-12487 is likely more challenging to fix and has not yet been 
addressed, the shorter term proposal here is to change the worker to set the 
following on consumer configs used in sink connectors:
{code:java}
 
partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor,org.apache.kafka.clients.consumer.RangeAssignor{code}
This works because the round robin will be used only after all workers have 
been upgraded, and this gives us more balanced consumer assignments. Plus, it 
is backward compatible since the worker will always override this new value 
should users have any worker configs that override this property via:
{code:java}
consumer.partition.assignment.strategy=... {code}
or have any connector configs that use client overrides via:
{code:java}
consumer.overrides.partition.assignment.strategy=...{code}
 

If that is the proposal, WDYT about updating the description to make this more 
clear? Essentially, I suggest this issue's description would state the problem 
(that section is good), propose a solution using round robin (mostly using your 
proposed section to use round robin rather than cooperative), document the 
workaround, and finally address why {{RoundRobitAssignor}} was used instead of 
{{CooperativeStickyAssignor}}. 

 

And if we're on the same page, then I think it's worth updating the PR to 
implement the proposed fix. I'll state the same on a review for the PR.

> Update default consumer partition assignor for sink tasks
> -
>
> Key: KAFKA-12463
> URL: https://issues.apache.org/jira/browse/KAFKA-12463
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Chris Egerton
>Assignee: Chris Egerton
>Priority: Major
>
> Kafka consumers have a pluggable [partition assignment 
> interface|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.html]
>  that comes with several out-of-the-box implementations including the 
> [RangeAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/RangeAssignor.html],
>  
> [RoundRobinAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/RoundRobinAssignor.html],
>  
> [StickyAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/StickyAssignor.html],
>  and 
> [CooperativeStickyAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/CooperativeStickyAssignor.html].
> If no partition assignor is configured with a consumer, the {{RangeAssignor}} 
> is used by default. Although there are some benefits to this assignor 
> including stability of assignment across generations and simplicity of 
> design, it comes with a major drawback: the number of active consumers in a 
> group is limited to the number of partitions in the topic(s) with the most 
> partitions. For an example of the worst case, in a consumer group where every 
> member is subscribed to ten topics that each have one partition, only one 
> member of that group will be assigned any topic partitions.
> This can end up producing counterintuitive and even frustrating behavior when 
> a sink connector is brought up with N tasks to read from some collection of 
> topics with a total of N topic partitions, but some tasks end up idling and 
> not processing any data.
> h3. Proposed Change
> *NOTE: Until/unless KAFKA-12477 is addressed, the approach outlined below 
> will not work as consumers will still perform eager rebalancing as long as at 
> least one of the partition assignors they are configured with does not 
> support cooperative rebalancing. KAFKA-12487 s

[jira] [Commented] (KAFKA-12463) Update default consumer partition assignor for sink tasks

2021-03-16 Thread Chris Egerton (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12463?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17302981#comment-17302981
 ] 

Chris Egerton commented on KAFKA-12463:
---

Fun fact--Connect does not like cooperative consumers. See 
https://issues.apache.org/jira/browse/KAFKA-12487 for details.

Until/unless that's addressed, we can't change the default partitioner for 
Connect to the {{CooperativeStickyAssignor}}.

 

[~rhauch] thanks for the rewrite, I hope that makes things easier to read. 
Responses inline:
{quote}Specifically, isn't it true that after the Connect worker config is 
changed, one rolling restart of the Connect workers will not immediately use 
the cooperative assignor (since the last worker to restart will only use the 
range assignor (the default), and that the cooperative assignor will only be 
used after a second rolling restart.
{quote}
That's true if the second rolling restart includes a change to the partition 
assignment strategy for its consumers to only use the 
{{CooperativeStickyAssignor}} and remove the {{RangeAssignor}} from the list, 
yes. Given that and the recently-discovered KAFKA-12487 , I don't even think 
we're at the point of updating the description yet to mention an upgrade 
process that would accommodate the {{CooperativeStickyAssignor}} without having 
a warning notice preceding any hypothetical designs we might implement once 
both of these pain points are addressed. I've updated the description 
accordingly, feel free to make any edits as long as we don't actually instruct 
users how to configure their workers with the {{CooperativeStickyAssignor}} as 
that will lead to bad worker behavior.
{quote}In both standalone and distributed, Connect only updates the task 
configs if and only if the task configs changed. If a user *only* changes the 
{{consumer.override.partition.assignment.strategy}} property in a connector 
config, then the task configs will not be changed and the tasks will not be 
restarted.
{quote}
Ah yes, KAFKA-9228. When I initially discovered that one I was a little alarmed 
but it turns out that the overwhelming majority of connectors out there are 
unaffected by just by quirk of how people tend to implement 
{{Connector::taskConfigs}}. The only cases I've been able to find where this 
bug comes up are in the file stream connectors. If we believe this is likely to 
affect other connectors, I personally think we should be addressing that bug 
instead of working around it or documenting it as a potential gotchas.
{quote}If Connect does attempt to restart the connector's tasks, the herder 
does not wait for the tasks to stop before starting any of them. This means 
that it may take several such connector & task restarts before the cooperative 
partition assignment strategy will take effect.
{quote}
That's a fair point, and it applies to any change of partition assignment 
strategy and not just specifically moving from an eager to a cooperative one. 
This becomes especially likely if a task isn't able to respond to a shutdown 
request within the graceful shutdown period (which defaults to five seconds). 
The workaround here is to enable both partition assignment strategies for the 
consumer with a preference for the desired strategy; that way, the desired 
strategy will take effect as soon as every consumer in the group has been 
updated, and nobody will break beforehand. I'll update the workaround section 
in the description to include that info.

 

I'd like to point out that the goal here is to improve the out-of-the-box 
behavior of Connect for users; although workarounds are nice to have, the goal 
here shouldn't be to focus on documenting them but instead, to make them 
obsolete. If we decide not to improve the default behavior of Connect then we 
can document this somewhere else that's a little more visible for users as 
opposed to developers.

> Update default consumer partition assignor for sink tasks
> -
>
> Key: KAFKA-12463
> URL: https://issues.apache.org/jira/browse/KAFKA-12463
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Chris Egerton
>Assignee: Chris Egerton
>Priority: Major
>
> Kafka consumers have a pluggable [partition assignment 
> interface|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.html]
>  that comes with several out-of-the-box implementations including the 
> [RangeAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/RangeAssignor.html],
>  
> [RoundRobinAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/RoundRobinAssignor.html],
>  
> [StickyAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/StickyAssignor.html],
>  and 
> 

[jira] [Commented] (KAFKA-12463) Update default consumer partition assignor for sink tasks

2021-03-16 Thread Randall Hauch (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12463?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17302680#comment-17302680
 ] 

Randall Hauch commented on KAFKA-12463:
---

[~ChrisEgerton], I editing the description of this issue a bit to clarify the 
proposal versus the workaround, and formatted the config settings to make them 
more prominent.

Given [~ableegoldman]'s comments about a double rolling upgrade being required 
to ensure the consumer group uses the cooperative assignor, WDYT about updating 
this issue's description to talk about rolling upgrades. Specifically, isn't it 
true that after the Connect worker config is changed, one rolling restart of 
the Connect workers will not immediately use the cooperative assignor (since 
the last worker to restart will only use the range assignor (the default), and 
that the cooperative assignor will only be used after a second rolling restart.

Interestingly, IIUC when a connector configuration is changed to specify the 
partition assignors in AK 2.4 or later, there are a few challenges to make this 
work:
 # In both standalone and distributed, Connect only updates the task configs if 
and only if the task configs changed. If a user *only* changes the 
{{consumer.override.partition.assignment.strategy}} property in a connector 
config, then the task configs will not be changed and the tasks will not be 
restarted.
 # If Connect does attempt to restart the connector's tasks, the herder does 
not wait for the tasks to stop before starting any of them. This means that it 
may take several such connector & task restarts before the cooperative 
partition assignment strategy will take effect.

Seems like we should at least mention this in the workaround above. WDYT?

> Update default consumer partition assignor for sink tasks
> -
>
> Key: KAFKA-12463
> URL: https://issues.apache.org/jira/browse/KAFKA-12463
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Chris Egerton
>Assignee: Chris Egerton
>Priority: Major
>
> Kafka consumers have a pluggable [partition assignment 
> interface|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.html]
>  that comes with several out-of-the-box implementations including the 
> [RangeAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/RangeAssignor.html],
>  
> [RoundRobinAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/RoundRobinAssignor.html],
>  
> [StickyAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/StickyAssignor.html],
>  and 
> [CooperativeStickyAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/CooperativeStickyAssignor.html].
> If no partition assignor is configured with a consumer, the {{RangeAssignor}} 
> is used by default. Although there are some benefits to this assignor 
> including stability of assignment across generations and simplicity of 
> design, it comes with a major drawback: the number of active consumers in a 
> group is limited to the number of partitions in the topic(s) with the most 
> partitions. For an example of the worst case, in a consumer group where every 
> member is subscribed to ten topics that each have one partition, only one 
> member of that group will be assigned any topic partitions.
> This can end up producing counterintuitive and even frustrating behavior when 
> a sink connector is brought up with N tasks to read from some collection of 
> topics with a total of N topic partitions, but some tasks end up idling and 
> not processing any data.
> h3. Proposed Change
> [KIP-429|https://cwiki.apache.org/confluence/display/KAFKA/KIP-429%3A+Kafka+Consumer+Incremental+Rebalance+Protocol]
>  introduced the {{CooperativeStickyAssignor}}, which seeks to provide a 
> stable assignment across generations wherever possible, provide the most even 
> assignment possible (taking into account possible differences in 
> subscriptions across consumers in the group), and allow consumers to continue 
> processing data during rebalance. The documentation for the assignor states 
> that "Users should prefer this assignor for newer clusters."
> We should alter the default consumer configuration for sink tasks to use the 
> new {{CooperativeStickyAssignor}}. In order to do this in a 
> backwards-compatible fashion that also enables rolling upgrades, this should 
> be implemented by changing the {{Worker}} to set the following on the 
> consumer configuration created for each sink connector task:
> {code:java}
> partition.assignment.strategy=org.apache.kafka.clients.consumer.CooperativeStickyAssignor,org.apache.kafka.clients.consumer.RangeAssignor
> {code}
> This way, consumer groups 

[jira] [Commented] (KAFKA-12463) Update default consumer partition assignor for sink tasks

2021-03-15 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12463?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17302225#comment-17302225
 ] 

A. Sophie Blee-Goldman commented on KAFKA-12463:


Filed https://issues.apache.org/jira/browse/KAFKA-12477 – would be nice to 
target 3.0 here, both for this ticket and for 
https://issues.apache.org/jira/browse/KAFKA-12473

> Update default consumer partition assignor for sink tasks
> -
>
> Key: KAFKA-12463
> URL: https://issues.apache.org/jira/browse/KAFKA-12463
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Chris Egerton
>Assignee: Chris Egerton
>Priority: Major
>
> Kafka consumers have a pluggable [partition assignment 
> interface|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.html]
>  that comes with several out-of-the-box implementations including the 
> [RangeAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/RangeAssignor.html],
>  
> [RoundRobinAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/RoundRobinAssignor.html],
>  
> [StickyAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/StickyAssignor.html],
>  and 
> [CooperativeStickyAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/CooperativeStickyAssignor.html].
> If no partition assignor is configured with a consumer, the {{RangeAssignor}} 
> is used by default. Although there are some benefits to this assignor 
> including stability of assignment across generations and simplicity of 
> design, it comes with a major drawback: the number of active consumers in a 
> group is limited to the number of partitions in the topic(s) with the most 
> partitions. For an example of the worst case, in a consumer group where every 
> member is subscribed to ten topics that each have one partition, only one 
> member of that group will be assigned any topic partitions.
> This can end up producing counterintuitive and even frustrating behavior when 
> a sink connector is brought up with N tasks to read from some collection of 
> topics with a total of N topic partitions, but some tasks end up idling and 
> not processing any data.
>  
> [KIP-429|https://cwiki.apache.org/confluence/display/KAFKA/KIP-429%3A+Kafka+Consumer+Incremental+Rebalance+Protocol]
>  introduced the {{CooperativeStickyAssignor}}, which seeks to provide a 
> stable assignment across generations wherever possible, provide the most even 
> assignment possible (taking into account possible differences in 
> subscriptions across consumers in the group), and allow consumers to continue 
> processing data during rebalance. The documentation for the assignor states 
> that "Users should prefer this assignor for newer clusters."
> We should alter the default consumer configuration for sink tasks to use the 
> new {{CooperativeStickyAssignor}}. In order to do this in a 
> backwards-compatible fashion that also enables rolling upgrades, this should 
> be implemented by setting the {{partition.assignment.strategy}} property of 
> sink task consumers to the list 
> {{org.apache.kafka.clients.consumer.CooperativeStickyAssignor, 
> org.apache.kafka.clients.consumer.RangeAssignor}} when no worker-level or 
> connector-level override is present.
> This way, consumer groups for sink connectors on Connect clusters in the 
> process of being upgraded will continue to use the {{RangeAssignor}} until 
> all workers in the cluster have been upgraded, and then will switch over to 
> the new {{CooperativeStickyAssignor}} automatically.
>  
> This improvement is viable as far back as -2.3- 2.4, when the 
> {{CooperativeStickyAssignor}} was introduced, but given that it is not a bug 
> fix, should only be applied to the Connect framework in an upcoming minor 
> release. This does not preclude users from following the steps outlined here 
> to improve sink connector behavior on existing clusters by modifying their 
> worker configs to use {{consumer.partition.assignment.strategy =}} 
> {{org.apache.kafka.clients.consumer.CooperativeStickyAssignor, 
> org.apache.kafka.clients.consumer.RangeAssignor}}, or doing the same on a 
> per-connector basis using the 
> {{consumer.override.partition.assignment.strategy}} property.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12463) Update default consumer partition assignor for sink tasks

2021-03-15 Thread Chris Egerton (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12463?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17302205#comment-17302205
 ] 

Chris Egerton commented on KAFKA-12463:
---

Haha, I should have been clearer–we do use our own type of coordinator for 
Connect, but that's for a separate thing. The 
[WorkerCoordinator|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java]
 is used by workers to assign connectors and tasks amongst themselves.

For sink connectors, we create a consumer group and each task is given a single 
consumer in that group (the 
[WorkerSinkTask|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java]
 class contains a lot of this logic). This is the type of consumer group I 
think we should improve the assignment logic for in Connect.

You've laid out the issues with using any kind of cooperative assignor with the 
consumer pretty well. If we can improve the consumer rebalance logic to enable 
a one-bounce upgrade to the cooperative protocol that'd be fantastic; in the 
meantime, it may be worth it to switch to another assignor that, while not 
cooperative, is still friendlier to sink connectors that consume from a large 
number of low-partition topics.

> Update default consumer partition assignor for sink tasks
> -
>
> Key: KAFKA-12463
> URL: https://issues.apache.org/jira/browse/KAFKA-12463
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Chris Egerton
>Assignee: Chris Egerton
>Priority: Major
>
> Kafka consumers have a pluggable [partition assignment 
> interface|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.html]
>  that comes with several out-of-the-box implementations including the 
> [RangeAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/RangeAssignor.html],
>  
> [RoundRobinAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/RoundRobinAssignor.html],
>  
> [StickyAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/StickyAssignor.html],
>  and 
> [CooperativeStickyAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/CooperativeStickyAssignor.html].
> If no partition assignor is configured with a consumer, the {{RangeAssignor}} 
> is used by default. Although there are some benefits to this assignor 
> including stability of assignment across generations and simplicity of 
> design, it comes with a major drawback: the number of active consumers in a 
> group is limited to the number of partitions in the topic(s) with the most 
> partitions. For an example of the worst case, in a consumer group where every 
> member is subscribed to ten topics that each have one partition, only one 
> member of that group will be assigned any topic partitions.
> This can end up producing counterintuitive and even frustrating behavior when 
> a sink connector is brought up with N tasks to read from some collection of 
> topics with a total of N topic partitions, but some tasks end up idling and 
> not processing any data.
>  
> [KIP-429|https://cwiki.apache.org/confluence/display/KAFKA/KIP-429%3A+Kafka+Consumer+Incremental+Rebalance+Protocol]
>  introduced the {{CooperativeStickyAssignor}}, which seeks to provide a 
> stable assignment across generations wherever possible, provide the most even 
> assignment possible (taking into account possible differences in 
> subscriptions across consumers in the group), and allow consumers to continue 
> processing data during rebalance. The documentation for the assignor states 
> that "Users should prefer this assignor for newer clusters."
> We should alter the default consumer configuration for sink tasks to use the 
> new {{CooperativeStickyAssignor}}. In order to do this in a 
> backwards-compatible fashion that also enables rolling upgrades, this should 
> be implemented by setting the {{partition.assignment.strategy}} property of 
> sink task consumers to the list 
> {{org.apache.kafka.clients.consumer.CooperativeStickyAssignor, 
> org.apache.kafka.clients.consumer.RangeAssignor}} when no worker-level or 
> connector-level override is present.
> This way, consumer groups for sink connectors on Connect clusters in the 
> process of being upgraded will continue to use the {{RangeAssignor}} until 
> all workers in the cluster have been upgraded, and then will switch over to 
> the new {{CooperativeStickyAssignor}} automatically.
>  
> This improvement is viable as far back as -2.3- 2.4, when the 
> {{CooperativeStickyAssignor}} was introduced, but given that it is not a bug 
> fix,

[jira] [Commented] (KAFKA-12463) Update default consumer partition assignor for sink tasks

2021-03-15 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12463?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17302202#comment-17302202
 ] 

A. Sophie Blee-Goldman commented on KAFKA-12463:


Maybe we could improve things on the Consumer side to avoid the need for a 
second rolling bounce, though. Haven't thought through all the details here but 
off the top of my head, it seems like we can actually choose the rebalancing 
protocol based off of the chosen assignor after the first rebalance. The very 
first rebalance after a consumer is bounced would require selecting the EAGER 
protocol, but that hardly matters since after restarting the consumer won't 
have any partitions to revoke to begin with. After a successful rebalance, the 
ConsumerCoordinator is informed that the group coordinator selected the 
CooperativeStickyAssignor, which of course indicates that all members have been 
upgraded to the new byte code and support cooperative rebalancing. At that 
point we can safely drop the EAGER protocol and choose the highest-supported 
protocol of the CooperativeStickyAssignor, which is of course the COOPERATIVE 
protocol.

tl;dr we don't need to stick with the same rebalancing protocol that gets 
selected in the ConsumerCoordinator's constructor, why not adapt to new 
information as it comes in

> Update default consumer partition assignor for sink tasks
> -
>
> Key: KAFKA-12463
> URL: https://issues.apache.org/jira/browse/KAFKA-12463
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Chris Egerton
>Assignee: Chris Egerton
>Priority: Major
>
> Kafka consumers have a pluggable [partition assignment 
> interface|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.html]
>  that comes with several out-of-the-box implementations including the 
> [RangeAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/RangeAssignor.html],
>  
> [RoundRobinAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/RoundRobinAssignor.html],
>  
> [StickyAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/StickyAssignor.html],
>  and 
> [CooperativeStickyAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/CooperativeStickyAssignor.html].
> If no partition assignor is configured with a consumer, the {{RangeAssignor}} 
> is used by default. Although there are some benefits to this assignor 
> including stability of assignment across generations and simplicity of 
> design, it comes with a major drawback: the number of active consumers in a 
> group is limited to the number of partitions in the topic(s) with the most 
> partitions. For an example of the worst case, in a consumer group where every 
> member is subscribed to ten topics that each have one partition, only one 
> member of that group will be assigned any topic partitions.
> This can end up producing counterintuitive and even frustrating behavior when 
> a sink connector is brought up with N tasks to read from some collection of 
> topics with a total of N topic partitions, but some tasks end up idling and 
> not processing any data.
>  
> [KIP-429|https://cwiki.apache.org/confluence/display/KAFKA/KIP-429%3A+Kafka+Consumer+Incremental+Rebalance+Protocol]
>  introduced the {{CooperativeStickyAssignor}}, which seeks to provide a 
> stable assignment across generations wherever possible, provide the most even 
> assignment possible (taking into account possible differences in 
> subscriptions across consumers in the group), and allow consumers to continue 
> processing data during rebalance. The documentation for the assignor states 
> that "Users should prefer this assignor for newer clusters."
> We should alter the default consumer configuration for sink tasks to use the 
> new {{CooperativeStickyAssignor}}. In order to do this in a 
> backwards-compatible fashion that also enables rolling upgrades, this should 
> be implemented by setting the {{partition.assignment.strategy}} property of 
> sink task consumers to the list 
> {{org.apache.kafka.clients.consumer.CooperativeStickyAssignor, 
> org.apache.kafka.clients.consumer.RangeAssignor}} when no worker-level or 
> connector-level override is present.
> This way, consumer groups for sink connectors on Connect clusters in the 
> process of being upgraded will continue to use the {{RangeAssignor}} until 
> all workers in the cluster have been upgraded, and then will switch over to 
> the new {{CooperativeStickyAssignor}} automatically.
>  
> This improvement is viable as far back as -2.3- 2.4, when the 
> {{CooperativeStickyAssignor}} was introduced, but given that it is not a bug 
> fix, should only be applied to the C

[jira] [Commented] (KAFKA-12463) Update default consumer partition assignor for sink tasks

2021-03-15 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12463?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17302192#comment-17302192
 ] 

A. Sophie Blee-Goldman commented on KAFKA-12463:


I think it still has the ConsumerCoordinator somewhere in the mix, though it 
would be good to understand exactly where/how that gets used within Connect: 
but I look a quick look at the code and a connect Worker will instantiate a 
KafkaConsumer, which in turn will always have a ConsumerCoordinator and 
therefore a KIP-429-style EAGER vs COOPERATIVE protocol. (And yes, the 
KafkaConsumer in Connect uses group management)

> Update default consumer partition assignor for sink tasks
> -
>
> Key: KAFKA-12463
> URL: https://issues.apache.org/jira/browse/KAFKA-12463
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Chris Egerton
>Assignee: Chris Egerton
>Priority: Major
>
> Kafka consumers have a pluggable [partition assignment 
> interface|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.html]
>  that comes with several out-of-the-box implementations including the 
> [RangeAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/RangeAssignor.html],
>  
> [RoundRobinAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/RoundRobinAssignor.html],
>  
> [StickyAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/StickyAssignor.html],
>  and 
> [CooperativeStickyAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/CooperativeStickyAssignor.html].
> If no partition assignor is configured with a consumer, the {{RangeAssignor}} 
> is used by default. Although there are some benefits to this assignor 
> including stability of assignment across generations and simplicity of 
> design, it comes with a major drawback: the number of active consumers in a 
> group is limited to the number of partitions in the topic(s) with the most 
> partitions. For an example of the worst case, in a consumer group where every 
> member is subscribed to ten topics that each have one partition, only one 
> member of that group will be assigned any topic partitions.
> This can end up producing counterintuitive and even frustrating behavior when 
> a sink connector is brought up with N tasks to read from some collection of 
> topics with a total of N topic partitions, but some tasks end up idling and 
> not processing any data.
>  
> [KIP-429|https://cwiki.apache.org/confluence/display/KAFKA/KIP-429%3A+Kafka+Consumer+Incremental+Rebalance+Protocol]
>  introduced the {{CooperativeStickyAssignor}}, which seeks to provide a 
> stable assignment across generations wherever possible, provide the most even 
> assignment possible (taking into account possible differences in 
> subscriptions across consumers in the group), and allow consumers to continue 
> processing data during rebalance. The documentation for the assignor states 
> that "Users should prefer this assignor for newer clusters."
> We should alter the default consumer configuration for sink tasks to use the 
> new {{CooperativeStickyAssignor}}. In order to do this in a 
> backwards-compatible fashion that also enables rolling upgrades, this should 
> be implemented by setting the {{partition.assignment.strategy}} property of 
> sink task consumers to the list 
> {{org.apache.kafka.clients.consumer.CooperativeStickyAssignor, 
> org.apache.kafka.clients.consumer.RangeAssignor}} when no worker-level or 
> connector-level override is present.
> This way, consumer groups for sink connectors on Connect clusters in the 
> process of being upgraded will continue to use the {{RangeAssignor}} until 
> all workers in the cluster have been upgraded, and then will switch over to 
> the new {{CooperativeStickyAssignor}} automatically.
>  
> This improvement is viable as far back as -2.3- 2.4, when the 
> {{CooperativeStickyAssignor}} was introduced, but given that it is not a bug 
> fix, should only be applied to the Connect framework in an upcoming minor 
> release. This does not preclude users from following the steps outlined here 
> to improve sink connector behavior on existing clusters by modifying their 
> worker configs to use {{consumer.partition.assignment.strategy =}} 
> {{org.apache.kafka.clients.consumer.CooperativeStickyAssignor, 
> org.apache.kafka.clients.consumer.RangeAssignor}}, or doing the same on a 
> per-connector basis using the 
> {{consumer.override.partition.assignment.strategy}} property.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12463) Update default consumer partition assignor for sink tasks

2021-03-15 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12463?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17302191#comment-17302191
 ] 

Guozhang Wang commented on KAFKA-12463:
---

Connect does not use ConsumerCoordinator but implements its own 
ConnectCoordinator, I vaguely remember while working on KIP-429 I looked at the 
connect code and concluded that Kafka Connect does not require a second rolling 
bounce, that's what I wrote down:

{code}
Note the difference with KIP-415 here: since on consumer we do not have the 
luxury to leverage on list of built-in assignors since it is user-customizable 
and hence would be black box to the consumer coordinator, we'd need two rolling 
bounces instead of one rolling bounce to complete the upgrade, whereas Connect 
only need one rolling bounce.
{code}

> Update default consumer partition assignor for sink tasks
> -
>
> Key: KAFKA-12463
> URL: https://issues.apache.org/jira/browse/KAFKA-12463
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Chris Egerton
>Assignee: Chris Egerton
>Priority: Major
>
> Kafka consumers have a pluggable [partition assignment 
> interface|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.html]
>  that comes with several out-of-the-box implementations including the 
> [RangeAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/RangeAssignor.html],
>  
> [RoundRobinAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/RoundRobinAssignor.html],
>  
> [StickyAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/StickyAssignor.html],
>  and 
> [CooperativeStickyAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/CooperativeStickyAssignor.html].
> If no partition assignor is configured with a consumer, the {{RangeAssignor}} 
> is used by default. Although there are some benefits to this assignor 
> including stability of assignment across generations and simplicity of 
> design, it comes with a major drawback: the number of active consumers in a 
> group is limited to the number of partitions in the topic(s) with the most 
> partitions. For an example of the worst case, in a consumer group where every 
> member is subscribed to ten topics that each have one partition, only one 
> member of that group will be assigned any topic partitions.
> This can end up producing counterintuitive and even frustrating behavior when 
> a sink connector is brought up with N tasks to read from some collection of 
> topics with a total of N topic partitions, but some tasks end up idling and 
> not processing any data.
>  
> [KIP-429|https://cwiki.apache.org/confluence/display/KAFKA/KIP-429%3A+Kafka+Consumer+Incremental+Rebalance+Protocol]
>  introduced the {{CooperativeStickyAssignor}}, which seeks to provide a 
> stable assignment across generations wherever possible, provide the most even 
> assignment possible (taking into account possible differences in 
> subscriptions across consumers in the group), and allow consumers to continue 
> processing data during rebalance. The documentation for the assignor states 
> that "Users should prefer this assignor for newer clusters."
> We should alter the default consumer configuration for sink tasks to use the 
> new {{CooperativeStickyAssignor}}. In order to do this in a 
> backwards-compatible fashion that also enables rolling upgrades, this should 
> be implemented by setting the {{partition.assignment.strategy}} property of 
> sink task consumers to the list 
> {{org.apache.kafka.clients.consumer.CooperativeStickyAssignor, 
> org.apache.kafka.clients.consumer.RangeAssignor}} when no worker-level or 
> connector-level override is present.
> This way, consumer groups for sink connectors on Connect clusters in the 
> process of being upgraded will continue to use the {{RangeAssignor}} until 
> all workers in the cluster have been upgraded, and then will switch over to 
> the new {{CooperativeStickyAssignor}} automatically.
>  
> This improvement is viable as far back as -2.3- 2.4, when the 
> {{CooperativeStickyAssignor}} was introduced, but given that it is not a bug 
> fix, should only be applied to the Connect framework in an upcoming minor 
> release. This does not preclude users from following the steps outlined here 
> to improve sink connector behavior on existing clusters by modifying their 
> worker configs to use {{consumer.partition.assignment.strategy =}} 
> {{org.apache.kafka.clients.consumer.CooperativeStickyAssignor, 
> org.apache.kafka.clients.consumer.RangeAssignor}}, or doing the same on a 
> per-connector basis using the 
> {{consumer.override.partition.assignment.strategy}} property.



--
This

[jira] [Commented] (KAFKA-12463) Update default consumer partition assignor for sink tasks

2021-03-15 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12463?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17302183#comment-17302183
 ] 

A. Sophie Blee-Goldman commented on KAFKA-12463:


Now I don't know if Connect actually uses the ConsumerCoordinator or not – I've 
seen that there is Connect-specific implementation of the AbstractCoordinator, 
called the WorderCoordinator, so I would assume not. But I vaguely recall 
someone from Connect telling me once that you do still rely on the 
ConsumerCoordinator somewhere/somehow. In which case I would think that you 
need the double rolling bounce as well

That said, do you even need the CooperativeStickyAssignor? Doesn't Connect have 
its own version of cooperative rebalancing, can't you just use the normal 
(EAGER) StickyAssignor? I really don't have much insight on how the Consumer 
client is embedded in Connect so maybe the consumer-level cooperative protocol 
would still be helpful on top of whatever Connect does.

> Update default consumer partition assignor for sink tasks
> -
>
> Key: KAFKA-12463
> URL: https://issues.apache.org/jira/browse/KAFKA-12463
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Chris Egerton
>Assignee: Chris Egerton
>Priority: Major
>
> Kafka consumers have a pluggable [partition assignment 
> interface|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.html]
>  that comes with several out-of-the-box implementations including the 
> [RangeAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/RangeAssignor.html],
>  
> [RoundRobinAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/RoundRobinAssignor.html],
>  
> [StickyAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/StickyAssignor.html],
>  and 
> [CooperativeStickyAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/CooperativeStickyAssignor.html].
> If no partition assignor is configured with a consumer, the {{RangeAssignor}} 
> is used by default. Although there are some benefits to this assignor 
> including stability of assignment across generations and simplicity of 
> design, it comes with a major drawback: the number of active consumers in a 
> group is limited to the number of partitions in the topic(s) with the most 
> partitions. For an example of the worst case, in a consumer group where every 
> member is subscribed to ten topics that each have one partition, only one 
> member of that group will be assigned any topic partitions.
> This can end up producing counterintuitive and even frustrating behavior when 
> a sink connector is brought up with N tasks to read from some collection of 
> topics with a total of N topic partitions, but some tasks end up idling and 
> not processing any data.
>  
> [KIP-429|https://cwiki.apache.org/confluence/display/KAFKA/KIP-429%3A+Kafka+Consumer+Incremental+Rebalance+Protocol]
>  introduced the {{CooperativeStickyAssignor}}, which seeks to provide a 
> stable assignment across generations wherever possible, provide the most even 
> assignment possible (taking into account possible differences in 
> subscriptions across consumers in the group), and allow consumers to continue 
> processing data during rebalance. The documentation for the assignor states 
> that "Users should prefer this assignor for newer clusters."
> We should alter the default consumer configuration for sink tasks to use the 
> new {{CooperativeStickyAssignor}}. In order to do this in a 
> backwards-compatible fashion that also enables rolling upgrades, this should 
> be implemented by setting the {{partition.assignment.strategy}} property of 
> sink task consumers to the list 
> {{org.apache.kafka.clients.consumer.CooperativeStickyAssignor, 
> org.apache.kafka.clients.consumer.RangeAssignor}} when no worker-level or 
> connector-level override is present.
> This way, consumer groups for sink connectors on Connect clusters in the 
> process of being upgraded will continue to use the {{RangeAssignor}} until 
> all workers in the cluster have been upgraded, and then will switch over to 
> the new {{CooperativeStickyAssignor}} automatically.
>  
> This improvement is viable as far back as -2.3- 2.4, when the 
> {{CooperativeStickyAssignor}} was introduced, but given that it is not a bug 
> fix, should only be applied to the Connect framework in an upcoming minor 
> release. This does not preclude users from following the steps outlined here 
> to improve sink connector behavior on existing clusters by modifying their 
> worker configs to use {{consumer.partition.assignment.strategy =}} 
> {{org.apache.kafka.clients.consumer.CooperativeStickyAssignor, 

[jira] [Commented] (KAFKA-12463) Update default consumer partition assignor for sink tasks

2021-03-15 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12463?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17302179#comment-17302179
 ] 

A. Sophie Blee-Goldman commented on KAFKA-12463:


Wait, no, I double checked in the code and you actually do need the two rolling 
bounces for the plain Consumer client. You can't put the 
CooperativeStickyAssignor first in the list, it has to be at a lower priority 
during the first rolling bounce. The reason is that we choose which rebalancing 
protocol to follow based on the first assignor in the list, regardless of 
whether this one is actually chosen for the rebalance or even supported by all 
consumers. This happens inside the ConsumerCoordinator constructor, so we don't 
know anything about the other consumers at this point. And we need to know the 
protocol before a rebalance starts, so that we revoke all partitions in the 
case of EAGER, which is why we can't just wait until an assignor is chosen 
during the rebalance

> Update default consumer partition assignor for sink tasks
> -
>
> Key: KAFKA-12463
> URL: https://issues.apache.org/jira/browse/KAFKA-12463
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Chris Egerton
>Assignee: Chris Egerton
>Priority: Major
>
> Kafka consumers have a pluggable [partition assignment 
> interface|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.html]
>  that comes with several out-of-the-box implementations including the 
> [RangeAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/RangeAssignor.html],
>  
> [RoundRobinAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/RoundRobinAssignor.html],
>  
> [StickyAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/StickyAssignor.html],
>  and 
> [CooperativeStickyAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/CooperativeStickyAssignor.html].
> If no partition assignor is configured with a consumer, the {{RangeAssignor}} 
> is used by default. Although there are some benefits to this assignor 
> including stability of assignment across generations and simplicity of 
> design, it comes with a major drawback: the number of active consumers in a 
> group is limited to the number of partitions in the topic(s) with the most 
> partitions. For an example of the worst case, in a consumer group where every 
> member is subscribed to ten topics that each have one partition, only one 
> member of that group will be assigned any topic partitions.
> This can end up producing counterintuitive and even frustrating behavior when 
> a sink connector is brought up with N tasks to read from some collection of 
> topics with a total of N topic partitions, but some tasks end up idling and 
> not processing any data.
>  
> [KIP-429|https://cwiki.apache.org/confluence/display/KAFKA/KIP-429%3A+Kafka+Consumer+Incremental+Rebalance+Protocol]
>  introduced the {{CooperativeStickyAssignor}}, which seeks to provide a 
> stable assignment across generations wherever possible, provide the most even 
> assignment possible (taking into account possible differences in 
> subscriptions across consumers in the group), and allow consumers to continue 
> processing data during rebalance. The documentation for the assignor states 
> that "Users should prefer this assignor for newer clusters."
> We should alter the default consumer configuration for sink tasks to use the 
> new {{CooperativeStickyAssignor}}. In order to do this in a 
> backwards-compatible fashion that also enables rolling upgrades, this should 
> be implemented by setting the {{partition.assignment.strategy}} property of 
> sink task consumers to the list 
> {{org.apache.kafka.clients.consumer.CooperativeStickyAssignor, 
> org.apache.kafka.clients.consumer.RangeAssignor}} when no worker-level or 
> connector-level override is present.
> This way, consumer groups for sink connectors on Connect clusters in the 
> process of being upgraded will continue to use the {{RangeAssignor}} until 
> all workers in the cluster have been upgraded, and then will switch over to 
> the new {{CooperativeStickyAssignor}} automatically.
>  
> This improvement is viable as far back as -2.3- 2.4, when the 
> {{CooperativeStickyAssignor}} was introduced, but given that it is not a bug 
> fix, should only be applied to the Connect framework in an upcoming minor 
> release. This does not preclude users from following the steps outlined here 
> to improve sink connector behavior on existing clusters by modifying their 
> worker configs to use {{consumer.partition.assignment.strategy =}} 
> {{org.apache.kafka.clients.consumer.CooperativeStickyAssignor, 
>

[jira] [Commented] (KAFKA-12463) Update default consumer partition assignor for sink tasks

2021-03-15 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12463?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17302167#comment-17302167
 ] 

A. Sophie Blee-Goldman commented on KAFKA-12463:


Isn't the upgrade path for Connect identical to the one for the Consumer? 
There's no mention of Connect in KIP-429 because that KIP focuses on 
cooperative rebalancing for the plain Consumer and Kafka Streams, while Connect 
had its own separate KIP and protocol for incremental cooperative rebalancing. 
But if Connect relies on the same underlying consumer group protocol, then it 
should apply as well.

That said, I just re-read the Consumer upgrade section and it's got some 
misinformation. Seems to be implying that if you have mixed assignor protocols, 
you can end up with some consumers choosing an EAGER assignor while others 
choose the COOPERATIVE one, which is just...not true. Fake News!

Anyways thanks for raising this, I'll go clean it up. That's also a good point 
about one vs two rolling bounces: technically you don't _need_ the second 
rolling bounce if you listed the CooperativeStickyAssignor first in the 
preferred list on the initial rolling bounce. I guess the intention was to 
leave the consumer group in a stable state, so if for example you deployed a 
new consumer without explicitly configuring it to use the 
CooperativeStickyAssignor then it would fail rather than silently switch back 
to EAGER rebalancing. But it's not required. I'll go clarify that in the KIP as 
well.

> Update default consumer partition assignor for sink tasks
> -
>
> Key: KAFKA-12463
> URL: https://issues.apache.org/jira/browse/KAFKA-12463
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Chris Egerton
>Assignee: Chris Egerton
>Priority: Major
>
> Kafka consumers have a pluggable [partition assignment 
> interface|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.html]
>  that comes with several out-of-the-box implementations including the 
> [RangeAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/RangeAssignor.html],
>  
> [RoundRobinAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/RoundRobinAssignor.html],
>  
> [StickyAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/StickyAssignor.html],
>  and 
> [CooperativeStickyAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/CooperativeStickyAssignor.html].
> If no partition assignor is configured with a consumer, the {{RangeAssignor}} 
> is used by default. Although there are some benefits to this assignor 
> including stability of assignment across generations and simplicity of 
> design, it comes with a major drawback: the number of active consumers in a 
> group is limited to the number of partitions in the topic(s) with the most 
> partitions. For an example of the worst case, in a consumer group where every 
> member is subscribed to ten topics that each have one partition, only one 
> member of that group will be assigned any topic partitions.
> This can end up producing counterintuitive and even frustrating behavior when 
> a sink connector is brought up with N tasks to read from some collection of 
> topics with a total of N topic partitions, but some tasks end up idling and 
> not processing any data.
>  
> [KIP-429|https://cwiki.apache.org/confluence/display/KAFKA/KIP-429%3A+Kafka+Consumer+Incremental+Rebalance+Protocol]
>  introduced the {{CooperativeStickyAssignor}}, which seeks to provide a 
> stable assignment across generations wherever possible, provide the most even 
> assignment possible (taking into account possible differences in 
> subscriptions across consumers in the group), and allow consumers to continue 
> processing data during rebalance. The documentation for the assignor states 
> that "Users should prefer this assignor for newer clusters."
> We should alter the default consumer configuration for sink tasks to use the 
> new {{CooperativeStickyAssignor}}. In order to do this in a 
> backwards-compatible fashion that also enables rolling upgrades, this should 
> be implemented by setting the {{partition.assignment.strategy}} property of 
> sink task consumers to the list 
> {{org.apache.kafka.clients.consumer.CooperativeStickyAssignor, 
> org.apache.kafka.clients.consumer.RangeAssignor}} when no worker-level or 
> connector-level override is present.
> This way, consumer groups for sink connectors on Connect clusters in the 
> process of being upgraded will continue to use the {{RangeAssignor}} until 
> all workers in the cluster have been upgraded, and then will switch over to 
> the new {{CooperativeStickyAssignor}} automatically

[jira] [Commented] (KAFKA-12463) Update default consumer partition assignor for sink tasks

2021-03-15 Thread Chris Egerton (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12463?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17301706#comment-17301706
 ] 

Chris Egerton commented on KAFKA-12463:
---

cc [~rhauch] what do you think about this?

> Update default consumer partition assignor for sink tasks
> -
>
> Key: KAFKA-12463
> URL: https://issues.apache.org/jira/browse/KAFKA-12463
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Chris Egerton
>Assignee: Chris Egerton
>Priority: Major
>
> Kafka consumers have a pluggable [partition assignment 
> interface|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.html]
>  that comes with several out-of-the-box implementations including the 
> [RangeAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/RangeAssignor.html],
>  
> [RoundRobinAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/RoundRobinAssignor.html],
>  
> [StickyAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/StickyAssignor.html],
>  and 
> [CooperativeStickyAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/CooperativeStickyAssignor.html].
> If no partition assignor is configured with a consumer, the {{RangeAssignor}} 
> is used by default. Although there are some benefits to this assignor 
> including stability of assignment across generations and simplicity of 
> design, it comes with a major drawback: the number of active consumers in a 
> group is limited to the number of partitions in the topic(s) with the most 
> partitions. For an example of the worst case, in a consumer group where every 
> member is subscribed to ten topics that each have one partition, only one 
> member of that group will be assigned any topic partitions.
> This can end up producing counterintuitive and even frustrating behavior when 
> a sink connector is brought up with N tasks to read from some collection of 
> topics with a total of N topic partitions, but some tasks end up idling and 
> not processing any data.
>  
> [KIP-429|https://cwiki.apache.org/confluence/display/KAFKA/KIP-429%3A+Kafka+Consumer+Incremental+Rebalance+Protocol]
>  introduced the {{CooperativeStickyAssignor}}, which seeks to provide a 
> stable assignment across generations wherever possible, provide the most even 
> assignment possible (taking into account possible differences in 
> subscriptions across consumers in the group), and allow consumers to continue 
> processing data during rebalance. The documentation for the assignor states 
> that "Users should prefer this assignor for newer clusters."
> We should alter the default consumer configuration for sink tasks to use the 
> new {{CooperativeStickyAssignor}}. In order to do this in a 
> backwards-compatible fashion that also enables rolling upgrades, this should 
> be implemented by setting the {{partition.assignment.strategy}} property of 
> sink task consumers to the list 
> {{org.apache.kafka.clients.consumer.CooperativeStickyAssignor, 
> org.apache.kafka.clients.consumer.RangeAssignor}} when no worker-level or 
> connector-level override is present.
> This way, consumer groups for sink connectors on Connect clusters in the 
> process of being upgraded will continue to use the {{RangeAssignor}} until 
> all workers in the cluster have been upgraded, and then will switch over to 
> the new {{CooperativeStickyAssignor}} automatically.
>  
> This improvement is viable as far back as -2.3- 2.4, when the 
> {{CooperativeStickyAssignor}} was introduced, but given that it is not a bug 
> fix, should only be applied to the Connect framework in an upcoming minor 
> release. This does not preclude users from following the steps outlined here 
> to improve sink connector behavior on existing clusters by modifying their 
> worker configs to use {{consumer.partition.assignment.strategy =}} 
> {{org.apache.kafka.clients.consumer.CooperativeStickyAssignor, 
> org.apache.kafka.clients.consumer.RangeAssignor}}, or doing the same on a 
> per-connector basis using the 
> {{consumer.override.partition.assignment.strategy}} property.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12463) Update default consumer partition assignor for sink tasks

2021-03-14 Thread Chris Egerton (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12463?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17301327#comment-17301327
 ] 

Chris Egerton commented on KAFKA-12463:
---

Ah, thanks [~ableegoldman], I'd misread the javadocs for the cooperative sticky 
assignor.

 

RE clearness on the upgrade section in KIP-429–I didn't see a specific section 
for Connect, and both of the sections that were provided ("Consumer" and 
"Streams") provided a different procedure than the one I proposed here. It 
seems like an implicit goal of both of them is to arrive at an end state where 
all consumers only provide the cooperative assignor in their list of supported 
assignors, instead of the cooperative assignor first and with the other, older 
assignor behind it. I'm wondering if the lack of that goal is why this 
different approach (which only requires one-step rolling as opposed to two) is 
viable here but not necessarily for other applications?

> Update default consumer partition assignor for sink tasks
> -
>
> Key: KAFKA-12463
> URL: https://issues.apache.org/jira/browse/KAFKA-12463
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Chris Egerton
>Assignee: Chris Egerton
>Priority: Major
>
> Kafka consumers have a pluggable [partition assignment 
> interface|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.html]
>  that comes with several out-of-the-box implementations including the 
> [RangeAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/RangeAssignor.html],
>  
> [RoundRobinAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/RoundRobinAssignor.html],
>  
> [StickyAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/StickyAssignor.html],
>  and 
> [CooperativeStickyAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/CooperativeStickyAssignor.html].
> If no partition assignor is configured with a consumer, the {{RangeAssignor}} 
> is used by default. Although there are some benefits to this assignor 
> including stability of assignment across generations and simplicity of 
> design, it comes with a major drawback: the number of active consumers in a 
> group is limited to the number of partitions in the topic(s) with the most 
> partitions. For an example of the worst case, in a consumer group where every 
> member is subscribed to ten topics that each have one partition, only one 
> member of that group will be assigned any topic partitions.
> This can end up producing counterintuitive and even frustrating behavior when 
> a sink connector is brought up with N tasks to read from some collection of 
> topics with a total of N topic partitions, but some tasks end up idling and 
> not processing any data.
>  
> [KIP-429|https://cwiki.apache.org/confluence/display/KAFKA/KIP-429%3A+Kafka+Consumer+Incremental+Rebalance+Protocol]
>  introduced the {{CooperativeStickyAssignor}}, which seeks to provide a 
> stable assignment across generations wherever possible, provide the most even 
> assignment possible (taking into account possible differences in 
> subscriptions across consumers in the group), and allow consumers to continue 
> processing data during rebalance. The documentation for the assignor states 
> that "Users should prefer this assignor for newer clusters."
> We should alter the default consumer configuration for sink tasks to use the 
> new {{CooperativeStickyAssignor}}. In order to do this in a 
> backwards-compatible fashion that also enables rolling upgrades, this should 
> be implemented by setting the {{partition.assignment.strategy}} property of 
> sink task consumers to the list 
> {{org.apache.kafka.clients.consumer.CooperativeStickyAssignor, 
> org.apache.kafka.clients.consumer.RangeAssignor}} when no worker-level or 
> connector-level override is present.
> This way, consumer groups for sink connectors on Connect clusters in the 
> process of being upgraded will continue to use the {{RangeAssignor}} until 
> all workers in the cluster have been upgraded, and then will switch over to 
> the new {{CooperativeStickyAssignor}} automatically.
>  
> This improvement is viable as far back as -2.3- 2.4, when the 
> {{CooperativeStickyAssignor}} was introduced, but given that it is not a bug 
> fix, should only be applied to the Connect framework in an upcoming minor 
> release. This does not preclude users from following the steps outlined here 
> to improve sink connector behavior on existing clusters by modifying their 
> worker configs to use {{consumer.partition.assignment.strategy =}} 
> {{org.apache.kafka.clients.consumer.CooperativeStickyAssignor, 
> org.apache.kafka.clients.consumer

[jira] [Commented] (KAFKA-12463) Update default consumer partition assignor for sink tasks

2021-03-14 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12463?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17301321#comment-17301321
 ] 

A. Sophie Blee-Goldman commented on KAFKA-12463:


[~ChrisEgerton] just fyi, the CooperativeStickyAssignor was introduced in 2.4, 
not 2.3. Also, that upgrade protocol sounds correct but it should be covered in 
the KIP-429 document – did you check out the steps listed for Consumer upgrades 
in the section called "Compatibility and Upgrade Path"? 
[Link|https://cwiki.apache.org/confluence/display/KAFKA/KIP-429%3A+Kafka+Consumer+Incremental+Rebalance+Protocol#KIP429:KafkaConsumerIncrementalRebalanceProtocol-Consumer]

If you had a hard time finding that info or found the instructions unclear, 
please lmk so I can try to improve the docs.

> Update default consumer partition assignor for sink tasks
> -
>
> Key: KAFKA-12463
> URL: https://issues.apache.org/jira/browse/KAFKA-12463
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Chris Egerton
>Assignee: Chris Egerton
>Priority: Major
>
> Kafka consumers have a pluggable [partition assignment 
> interface|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.html]
>  that comes with several out-of-the-box implementations including the 
> [RangeAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/RangeAssignor.html],
>  
> [RoundRobinAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/RoundRobinAssignor.html],
>  
> [StickyAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/StickyAssignor.html],
>  and 
> [CooperativeStickyAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/CooperativeStickyAssignor.html].
> If no partition assignor is configured with a consumer, the {{RangeAssignor}} 
> is used by default. Although there are some benefits to this assignor 
> including stability of assignment across generations and simplicity of 
> design, it comes with a major drawback: the number of active consumers in a 
> group is limited to the number of partitions in the topic(s) with the most 
> partitions. For an example of the worst case, in a consumer group where every 
> member is subscribed to ten topics that each have one partition, only one 
> member of that group will be assigned any topic partitions.
> This can end up producing counterintuitive and even frustrating behavior when 
> a sink connector is brought up with N tasks to read from some collection of 
> topics with a total of N topic partitions, but some tasks end up idling and 
> not processing any data.
>  
> [KIP-429|https://cwiki.apache.org/confluence/display/KAFKA/KIP-429%3A+Kafka+Consumer+Incremental+Rebalance+Protocol]
>  introduced the {{CooperativeStickyAssignor}}, which seeks to provide a 
> stable assignment across generations wherever possible, provide the most even 
> assignment possible (taking into account possible differences in 
> subscriptions across consumers in the group), and allow consumers to continue 
> processing data during rebalance. The documentation for the assignor states 
> that "Users should prefer this assignor for newer clusters."
> We should alter the default consumer configuration for sink tasks to use the 
> new {{CooperativeStickyAssignor}}. In order to do this in a 
> backwards-compatible fashion that also enables rolling upgrades, this should 
> be implemented by setting the {{partition.assignment.strategy}} property of 
> sink task consumers to the list 
> {{org.apache.kafka.clients.consumer.CooperativeStickyAssignor, 
> org.apache.kafka.clients.consumer.RangeAssignor}} when no worker-level or 
> connector-level override is present.
> This way, consumer groups for sink connectors on Connect clusters in the 
> process of being upgraded will continue to use the {{RangeAssignor}} until 
> all workers in the cluster have been upgraded, and then will switch over to 
> the new {{CooperativeStickyAssignor}} automatically.
>  
> This improvement is viable as far back as 2.3, when the 
> {{CooperativeStickyAssignor}} was introduced, but given that it is not a bug 
> fix, should only be applied to the Connect framework in an upcoming minor 
> release. This does not preclude users from following the steps outlined here 
> to improve sink connector behavior on existing clusters by modifying their 
> worker configs to use {{consumer.partition.assignment.strategy =}} 
> {{org.apache.kafka.clients.consumer.CooperativeStickyAssignor, 
> org.apache.kafka.clients.consumer.RangeAssignor}}, or doing the same on a 
> per-connector basis using the 
> {{consumer.override.partition.assignment.strategy}} property.



--
This message was sent 

[jira] [Commented] (KAFKA-12463) Update default consumer partition assignor for sink tasks

2021-03-14 Thread Chris Egerton (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12463?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17301235#comment-17301235
 ] 

Chris Egerton commented on KAFKA-12463:
---

[~bchen225242] can you confirm the upgrade semantics outlined here? I think the 
Kafka group coordinator should force every consumer in the group to use range 
assignment until every consumer in the group is configured to use both the 
cooperative sticky assignor and the range assignor (in that order), at which 
point the group would automatically switch over to cooperative sticky 
assignment, but want to make sure since this upgrade path isn't covered in 
KIP-429.

> Update default consumer partition assignor for sink tasks
> -
>
> Key: KAFKA-12463
> URL: https://issues.apache.org/jira/browse/KAFKA-12463
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Chris Egerton
>Assignee: Chris Egerton
>Priority: Major
>
> Kafka consumers have a pluggable [partition assignment 
> interface|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.html]
>  that comes with several out-of-the-box implementations including the 
> [RangeAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/RangeAssignor.html],
>  
> [RoundRobinAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/RoundRobinAssignor.html],
>  
> [StickyAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/StickyAssignor.html],
>  and 
> [CooperativeStickyAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/CooperativeStickyAssignor.html].
> If no partition assignor is configured with a consumer, the {{RangeAssignor}} 
> is used by default. Although there are some benefits to this assignor 
> including stability of assignment across generations and simplicity of 
> design, it comes with a major drawback: the number of active consumers in a 
> group is limited to the number of partitions in the topic(s) with the most 
> partitions. For an example of the worst case, in a consumer group where every 
> member is subscribed to ten topics that each have one partition, only one 
> member of that group will be assigned any topic partitions.
> This can end up producing counterintuitive and even frustrating behavior when 
> a sink connector is brought up with N tasks to read from some collection of 
> topics with a total of N topic partitions, but some tasks end up idling and 
> not processing any data.
>  
> [KIP-429|https://cwiki.apache.org/confluence/display/KAFKA/KIP-429%3A+Kafka+Consumer+Incremental+Rebalance+Protocol]
>  introduced the {{CooperativeStickyAssignor}}, which seeks to provide a 
> stable assignment across generations wherever possible, provide the most even 
> assignment possible (taking into account possible differences in 
> subscriptions across consumers in the group), and allow consumers to continue 
> processing data during rebalance. The documentation for the assignor states 
> that "Users should prefer this assignor for newer clusters."
> We should alter the default consumer configuration for sink tasks to use the 
> new {{CooperativeStickyAssignor}}. In order to do this in a 
> backwards-compatible fashion that also enables rolling upgrades, this should 
> be implemented by setting the {{partition.assignment.strategy}} property of 
> sink task consumers to the list 
> {{org.apache.kafka.clients.consumer.CooperativeStickyAssignor, 
> org.apache.kafka.clients.consumer.RangeAssignor}} when no worker-level or 
> connector-level override is present.
> This way, consumer groups for sink connectors on Connect clusters in the 
> process of being upgraded will continue to use the {{RangeAssignor}} until 
> all workers in the cluster have been upgraded, and then will switch over to 
> the new {{CooperativeStickyAssignor}} automatically.
>  
> This improvement is viable as far back as 2.3, when the 
> {{CooperativeStickyAssignor}} was introduced, but given that it is not a bug 
> fix, should only be applied to the Connect framework in an upcoming minor 
> release. This does not preclude users from following the steps outlined here 
> to improve sink connector behavior on existing clusters by modifying their 
> worker configs to use {{consumer.partition.assignment.strategy =}} 
> {{org.apache.kafka.clients.consumer.CooperativeStickyAssignor, 
> org.apache.kafka.clients.consumer.RangeAssignor}}, or doing the same on a 
> per-connector basis using the 
> {{consumer.override.partition.assignment.strategy}} property.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)