[jira] [Updated] (KAFKA-12463) Update default consumer partition assignor for sink tasks

2021-03-24 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12463?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton updated KAFKA-12463:
--
Labels: needs-kip  (was: )

> Update default consumer partition assignor for sink tasks
> -
>
> Key: KAFKA-12463
> URL: https://issues.apache.org/jira/browse/KAFKA-12463
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Chris Egerton
>Assignee: Chris Egerton
>Priority: Major
>  Labels: needs-kip
>
> Kafka consumers have a pluggable [partition assignment 
> interface|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.html]
>  that comes with several out-of-the-box implementations including the 
> [RangeAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/RangeAssignor.html],
>  
> [RoundRobinAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/RoundRobinAssignor.html],
>  
> [StickyAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/StickyAssignor.html],
>  and 
> [CooperativeStickyAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/CooperativeStickyAssignor.html].
> If no partition assignor is configured with a consumer, the {{RangeAssignor}} 
> is used by default. Although there are some benefits to this assignor 
> including stability of assignment across generations and simplicity of 
> design, it comes with a major drawback: the number of active consumers in a 
> group is limited to the number of partitions in the topic(s) with the most 
> partitions. For an example of the worst case, in a consumer group where every 
> member is subscribed to ten topics that each have one partition, only one 
> member of that group will be assigned any topic partitions.
> This can end up producing counterintuitive and even frustrating behavior when 
> a sink connector is brought up with N tasks to read from some collection of 
> topics with a total of N topic partitions, but some tasks end up idling and 
> not processing any data.
> h3. Proposed Change
> *NOTE: Until/unless KAFKA-12477 is addressed, the approach outlined below 
> will not work as consumers will still perform eager rebalancing as long as at 
> least one of the partition assignors they are configured with does not 
> support cooperative rebalancing. KAFKA-12487 should also be addressed before 
> configuring any Connect worker to use the {{CooperativeStickyAssignor}} for 
> any sink connectors.*
> [KIP-429|https://cwiki.apache.org/confluence/display/KAFKA/KIP-429%3A+Kafka+Consumer+Incremental+Rebalance+Protocol]
>  introduced the {{CooperativeStickyAssignor}}, which seeks to provide a 
> stable assignment across generations wherever possible, provide the most even 
> assignment possible (taking into account possible differences in 
> subscriptions across consumers in the group), and allow consumers to continue 
> processing data during rebalance. The documentation for the assignor states 
> that "Users should prefer this assignor for newer clusters." As Connect and 
> the tooling around it matures and automatic restarts of failed tasks become 
> more popular, care should be taken to ensure that the consumer group churn 
> created by restarting one or more tasks doesn't compromise the availability 
> of other tasks by forcing them to temporarily yield up all of their 
> partitions just to reclaim them after a rebalance has completed.
> With that in mind, we should alter the default consumer configuration for 
> sink tasks to use the new {{CooperativeStickyAssignor}}. In order to do this 
> in a backwards-compatible fashion that also enables rolling upgrades, this 
> should be implemented by changing the {{Worker}} to set the following on the 
> consumer configuration created for each sink connector task:
> {code:java}
> partition.assignment.strategy=org.apache.kafka.clients.consumer.CooperativeStickyAssignor,org.apache.kafka.clients.consumer.RangeAssignor
> {code}
> This way, consumer groups for sink connectors on Connect clusters in the 
> process of being upgraded will continue to use the {{RangeAssignor}} until 
> all workers in the cluster have been upgraded, and then will switch over to 
> the new {{CooperativeStickyAssignor}} automatically.
> Importantly, this setting will only be a default, and any user-specified 
> overrides either in the *worker config*:
>  
> {code:java}
> consumer.partition.assignment.strategy={code}
>  
> or in the *connector config*:
>  
> {code:java}
> "consumer.override.partition.assignment.strategy": " strategy>"{code}
>  
> will still be respected.
> This improvement is viable as far back as -2.3- 2.4, when the 
> {{CooperativeStickyAssignor}} was introduced, but given that it is n

[jira] [Updated] (KAFKA-12463) Update default consumer partition assignor for sink tasks

2021-03-18 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12463?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton updated KAFKA-12463:
--
Description: 
Kafka consumers have a pluggable [partition assignment 
interface|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.html]
 that comes with several out-of-the-box implementations including the 
[RangeAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/RangeAssignor.html],
 
[RoundRobinAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/RoundRobinAssignor.html],
 
[StickyAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/StickyAssignor.html],
 and 
[CooperativeStickyAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/CooperativeStickyAssignor.html].

If no partition assignor is configured with a consumer, the {{RangeAssignor}} 
is used by default. Although there are some benefits to this assignor including 
stability of assignment across generations and simplicity of design, it comes 
with a major drawback: the number of active consumers in a group is limited to 
the number of partitions in the topic(s) with the most partitions. For an 
example of the worst case, in a consumer group where every member is subscribed 
to ten topics that each have one partition, only one member of that group will 
be assigned any topic partitions.

This can end up producing counterintuitive and even frustrating behavior when a 
sink connector is brought up with N tasks to read from some collection of 
topics with a total of N topic partitions, but some tasks end up idling and not 
processing any data.
h3. Proposed Change

*NOTE: Until/unless KAFKA-12477 is addressed, the approach outlined below will 
not work as consumers will still perform eager rebalancing as long as at least 
one of the partition assignors they are configured with does not support 
cooperative rebalancing. KAFKA-12487 should also be addressed before 
configuring any Connect worker to use the {{CooperativeStickyAssignor}} for any 
sink connectors.*

[KIP-429|https://cwiki.apache.org/confluence/display/KAFKA/KIP-429%3A+Kafka+Consumer+Incremental+Rebalance+Protocol]
 introduced the {{CooperativeStickyAssignor}}, which seeks to provide a stable 
assignment across generations wherever possible, provide the most even 
assignment possible (taking into account possible differences in subscriptions 
across consumers in the group), and allow consumers to continue processing data 
during rebalance. The documentation for the assignor states that "Users should 
prefer this assignor for newer clusters." As Connect and the tooling around it 
matures and automatic restarts of failed tasks become more popular, care should 
be taken to ensure that the consumer group churn created by restarting one or 
more tasks doesn't compromise the availability of other tasks by forcing them 
to temporarily yield up all of their partitions just to reclaim them after a 
rebalance has completed.

With that in mind, we should alter the default consumer configuration for sink 
tasks to use the new {{CooperativeStickyAssignor}}. In order to do this in a 
backwards-compatible fashion that also enables rolling upgrades, this should be 
implemented by changing the {{Worker}} to set the following on the consumer 
configuration created for each sink connector task:
{code:java}
partition.assignment.strategy=org.apache.kafka.clients.consumer.CooperativeStickyAssignor,org.apache.kafka.clients.consumer.RangeAssignor
{code}
This way, consumer groups for sink connectors on Connect clusters in the 
process of being upgraded will continue to use the {{RangeAssignor}} until all 
workers in the cluster have been upgraded, and then will switch over to the new 
{{CooperativeStickyAssignor}} automatically.

Importantly, this setting will only be a default, and any user-specified 
overrides either in the *worker config*:

 
{code:java}
consumer.partition.assignment.strategy={code}
 

or in the *connector config*:

 
{code:java}
"consumer.override.partition.assignment.strategy": ""{code}
 

will still be respected.

This improvement is viable as far back as -2.3- 2.4, when the 
{{CooperativeStickyAssignor}} was introduced, but given that it is not a bug 
fix, should only be applied to the Connect framework in an upcoming minor 
release.
h3. Workaround: manually setting the partition assignment strategy

There is a simple workaround to achieve the same behavior in releases 2.4 and 
later that don't include this improvement: manually override either a connector 
configuration or an entire worker configuration.

In order to avoid task failures while the connector is being reconfigured, it 
is highly recommended that the consumer be configured with a list of both the 
new and the current partition assignment strategies, instead of just the new 
partiti

[jira] [Updated] (KAFKA-12463) Update default consumer partition assignor for sink tasks

2021-03-18 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12463?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton updated KAFKA-12463:
--
Description: 
Kafka consumers have a pluggable [partition assignment 
interface|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.html]
 that comes with several out-of-the-box implementations including the 
[RangeAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/RangeAssignor.html],
 
[RoundRobinAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/RoundRobinAssignor.html],
 
[StickyAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/StickyAssignor.html],
 and 
[CooperativeStickyAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/CooperativeStickyAssignor.html].

If no partition assignor is configured with a consumer, the {{RangeAssignor}} 
is used by default. Although there are some benefits to this assignor including 
stability of assignment across generations and simplicity of design, it comes 
with a major drawback: the number of active consumers in a group is limited to 
the number of partitions in the topic(s) with the most partitions. For an 
example of the worst case, in a consumer group where every member is subscribed 
to ten topics that each have one partition, only one member of that group will 
be assigned any topic partitions.

This can end up producing counterintuitive and even frustrating behavior when a 
sink connector is brought up with N tasks to read from some collection of 
topics with a total of N topic partitions, but some tasks end up idling and not 
processing any data.
h3. Proposed Change

*NOTE: Until/unless KAFKA-12477 is addressed, the approach outlined below will 
not work as consumers will still perform eager rebalancing as long as at least 
one of the partition assignors they are configured with does not support 
cooperative rebalancing. KAFKA-12487 should also be addressed before 
configuring any Connect worker to use the {{CooperativeStickyAssignor}} for any 
sink connectors.*

[KIP-429|https://cwiki.apache.org/confluence/display/KAFKA/KIP-429%3A+Kafka+Consumer+Incremental+Rebalance+Protocol]
 introduced the {{CooperativeStickyAssignor}}, which seeks to provide a stable 
assignment across generations wherever possible, provide the most even 
assignment possible (taking into account possible differences in subscriptions 
across consumers in the group), and allow consumers to continue processing data 
during rebalance. The documentation for the assignor states that "Users should 
prefer this assignor for newer clusters."

We should alter the default consumer configuration for sink tasks to use the 
new {{CooperativeStickyAssignor}}. In order to do this in a 
backwards-compatible fashion that also enables rolling upgrades, this should be 
implemented by changing the {{Worker}} to set the following on the consumer 
configuration created for each sink connector task:
{code:java}
partition.assignment.strategy=org.apache.kafka.clients.consumer.CooperativeStickyAssignor,org.apache.kafka.clients.consumer.RangeAssignor
{code}
This way, consumer groups for sink connectors on Connect clusters in the 
process of being upgraded will continue to use the {{RangeAssignor}} until all 
workers in the cluster have been upgraded, and then will switch over to the new 
{{CooperativeStickyAssignor}} automatically. But, this setting will be 
overwritten by any user-specified {{consumer.partition.assignment.strategy}} 
property in the worker configuration, and by any user-specified 
{{consumer.override.partition.assignment.strategy}} property in a sink 
connector configuration when per-connector client overrides is enabled in the 
worker config with {{connector.client.config.override.policy=ALL}}.

This improvement is viable as far back as -2.3- 2.4, when the 
{{CooperativeStickyAssignor}} was introduced, but given that it is not a bug 
fix, should only be applied to the Connect framework in an upcoming minor 
release.
h3. Manually setting the partition assignment strategy

There is a simple workaround to achieve the same behavior in releases 2.4 and 
later that don't include this improvement: manually override either a connector 
configuration or an entire worker configuration.

In order to avoid task failures while the connector is being reconfigured, it 
is highly recommended that the consumer be configured with a list of both the 
new and the current partition assignment strategies, instead of just the new 
partition assignment strategy.

 

For example, to update a connector formerly configured to use the 
{{RangeAssignor}} strategy to instead use the {{RoundRobinAssignor}} strategy, 
add the following to the connector configuration:
{code:java}
"consumer.override.partition.assignment.strategy": 
"org.apache.kafka.clients.consumer.RoundRobinA

[jira] [Updated] (KAFKA-12463) Update default consumer partition assignor for sink tasks

2021-03-16 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12463?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton updated KAFKA-12463:
--
Description: 
Kafka consumers have a pluggable [partition assignment 
interface|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.html]
 that comes with several out-of-the-box implementations including the 
[RangeAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/RangeAssignor.html],
 
[RoundRobinAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/RoundRobinAssignor.html],
 
[StickyAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/StickyAssignor.html],
 and 
[CooperativeStickyAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/CooperativeStickyAssignor.html].

If no partition assignor is configured with a consumer, the {{RangeAssignor}} 
is used by default. Although there are some benefits to this assignor including 
stability of assignment across generations and simplicity of design, it comes 
with a major drawback: the number of active consumers in a group is limited to 
the number of partitions in the topic(s) with the most partitions. For an 
example of the worst case, in a consumer group where every member is subscribed 
to ten topics that each have one partition, only one member of that group will 
be assigned any topic partitions.

This can end up producing counterintuitive and even frustrating behavior when a 
sink connector is brought up with N tasks to read from some collection of 
topics with a total of N topic partitions, but some tasks end up idling and not 
processing any data.
h3. Proposed Change

*NOTE: Until/unless KAFKA-12477 is addressed, the approach outlined below will 
not work as consumers will still perform eager rebalancing as long as at least 
one of the partition assignors they are configured with does not support 
cooperative rebalancing. KAFKA-12487 should also be addressed before 
configuring any Connect worker to use the {{CooperativeStickyAssignor}} for any 
sink connectors.*

[KIP-429|https://cwiki.apache.org/confluence/display/KAFKA/KIP-429%3A+Kafka+Consumer+Incremental+Rebalance+Protocol]
 introduced the {{CooperativeStickyAssignor}}, which seeks to provide a stable 
assignment across generations wherever possible, provide the most even 
assignment possible (taking into account possible differences in subscriptions 
across consumers in the group), and allow consumers to continue processing data 
during rebalance. The documentation for the assignor states that "Users should 
prefer this assignor for newer clusters."

We should alter the default consumer configuration for sink tasks to use the 
new {{CooperativeStickyAssignor}}. In order to do this in a 
backwards-compatible fashion that also enables rolling upgrades, this should be 
implemented by changing the {{Worker}} to set the following on the consumer 
configuration created for each sink connector task:
{code:java}
partition.assignment.strategy=org.apache.kafka.clients.consumer.CooperativeStickyAssignor,org.apache.kafka.clients.consumer.RangeAssignor
{code}
This way, consumer groups for sink connectors on Connect clusters in the 
process of being upgraded will continue to use the {{RangeAssignor}} until all 
workers in the cluster have been upgraded, and then will switch over to the new 
{{CooperativeStickyAssignor}} automatically. But, this setting will be 
overwritten by any user-specified {{consumer.partition.assignment.strategy}} 
property in the worker configuration, and by any user-specified 
{{consumer.override.partition.assignment.strategy}} property in a sink 
connector configuration when per-connector client overrides is enabled in the 
worker config with {{connector.client.config.override.policy=ALL}}.

This improvement is viable as far back as -2.3- 2.4, when the 
{{CooperativeStickyAssignor}} was introduced, but given that it is not a bug 
fix, should only be applied to the Connect framework in an upcoming minor 
release.
h3. Manually setting the partition assignment strategy

There is a simple workaround to achieve the same behavior in AK releases 2.4 
and later that don't also include this improvement: either set a value for the 
{{consumer.partition.assignment.strategy}} property in the *worker 
configuration, or* set a value for the 
{{consumer.override.partition.assignment.strategy}} property in one or more 
*connector configurations* when per-connector client overrides is enabled in 
the worker config with {{connector.client.config.override.policy=ALL}}.

In order to avoid task failures while the connector is being reconfigured, it 
is highly recommended that the consumer be configured with a list of both the 
new and the current partition assignment strategies, instead of just the new 
partition assignment strategy. For example, to update a co

[jira] [Updated] (KAFKA-12463) Update default consumer partition assignor for sink tasks

2021-03-16 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12463?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton updated KAFKA-12463:
--
Description: 
Kafka consumers have a pluggable [partition assignment 
interface|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.html]
 that comes with several out-of-the-box implementations including the 
[RangeAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/RangeAssignor.html],
 
[RoundRobinAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/RoundRobinAssignor.html],
 
[StickyAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/StickyAssignor.html],
 and 
[CooperativeStickyAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/CooperativeStickyAssignor.html].

If no partition assignor is configured with a consumer, the {{RangeAssignor}} 
is used by default. Although there are some benefits to this assignor including 
stability of assignment across generations and simplicity of design, it comes 
with a major drawback: the number of active consumers in a group is limited to 
the number of partitions in the topic(s) with the most partitions. For an 
example of the worst case, in a consumer group where every member is subscribed 
to ten topics that each have one partition, only one member of that group will 
be assigned any topic partitions.

This can end up producing counterintuitive and even frustrating behavior when a 
sink connector is brought up with N tasks to read from some collection of 
topics with a total of N topic partitions, but some tasks end up idling and not 
processing any data.
h3. Proposed Change

*NOTE: Until/unless KAFKA-12477 is addressed, the approach outlined below will 
not work as consumers will still perform eager rebalancing as long as at least 
one of the partition assignors they are configured with does not support 
cooperative rebalancing. KAFKA-12487 should also be addressed before 
configuring any Connect worker to use the {{CooperativeStickyAssignor}} for any 
sink connectors.*

[KIP-429|https://cwiki.apache.org/confluence/display/KAFKA/KIP-429%3A+Kafka+Consumer+Incremental+Rebalance+Protocol]
 introduced the {{CooperativeStickyAssignor}}, which seeks to provide a stable 
assignment across generations wherever possible, provide the most even 
assignment possible (taking into account possible differences in subscriptions 
across consumers in the group), and allow consumers to continue processing data 
during rebalance. The documentation for the assignor states that "Users should 
prefer this assignor for newer clusters."

We should alter the default consumer configuration for sink tasks to use the 
new {{CooperativeStickyAssignor}}. In order to do this in a 
backwards-compatible fashion that also enables rolling upgrades, this should be 
implemented by changing the {{Worker}} to set the following on the consumer 
configuration created for each sink connector task:
{code:java}
partition.assignment.strategy=org.apache.kafka.clients.consumer.CooperativeStickyAssignor,org.apache.kafka.clients.consumer.RangeAssignor
{code}
This way, consumer groups for sink connectors on Connect clusters in the 
process of being upgraded will continue to use the {{RangeAssignor}} until all 
workers in the cluster have been upgraded, and then will switch over to the new 
{{CooperativeStickyAssignor}} automatically. But, this setting will be 
overwritten by any user-specified {{consumer.partition.assignment.strategy}} 
property in the worker configuration, and by any user-specified 
{{consumer.override.partition.assignment.strategy}} property in a sink 
connector configuration when per-connector client overrides is enabled in the 
worker config with {{connector.client.config.override.policy=ALL}}.

This improvement is viable as far back as -2.3- 2.4, when the 
{{CooperativeStickyAssignor}} was introduced, but given that it is not a bug 
fix, should only be applied to the Connect framework in an upcoming minor 
release.
h3. Manually setting the partition assignment strategy

There is a simple workaround to achieve the same behavior in AK releases 2.4 
and later that don't also include this improvement: either set a value for the 
{{consumer.partition.assignment.strategy}} property in the *worker 
configuration, or* set a value for the 
{{consumer.override.partition.assignment.strategy}} property in one or more 
*connector configurations* when per-connector client overrides is enabled in 
the worker config with {{connector.client.config.override.policy=ALL}}.

  was:
Kafka consumers have a pluggable [partition assignment 
interface|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.html]
 that comes with several out-of-the-box implementations including the 
[RangeAssignor|https://kafka.apache.org/27/javad

[jira] [Updated] (KAFKA-12463) Update default consumer partition assignor for sink tasks

2021-03-16 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12463?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton updated KAFKA-12463:
--
Description: 
Kafka consumers have a pluggable [partition assignment 
interface|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.html]
 that comes with several out-of-the-box implementations including the 
[RangeAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/RangeAssignor.html],
 
[RoundRobinAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/RoundRobinAssignor.html],
 
[StickyAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/StickyAssignor.html],
 and 
[CooperativeStickyAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/CooperativeStickyAssignor.html].

If no partition assignor is configured with a consumer, the {{RangeAssignor}} 
is used by default. Although there are some benefits to this assignor including 
stability of assignment across generations and simplicity of design, it comes 
with a major drawback: the number of active consumers in a group is limited to 
the number of partitions in the topic(s) with the most partitions. For an 
example of the worst case, in a consumer group where every member is subscribed 
to ten topics that each have one partition, only one member of that group will 
be assigned any topic partitions.

This can end up producing counterintuitive and even frustrating behavior when a 
sink connector is brought up with N tasks to read from some collection of 
topics with a total of N topic partitions, but some tasks end up idling and not 
processing any data.
h3. Proposed Change

*NOTE: Until/unless*

 

[KIP-429|https://cwiki.apache.org/confluence/display/KAFKA/KIP-429%3A+Kafka+Consumer+Incremental+Rebalance+Protocol]
 introduced the {{CooperativeStickyAssignor}}, which seeks to provide a stable 
assignment across generations wherever possible, provide the most even 
assignment possible (taking into account possible differences in subscriptions 
across consumers in the group), and allow consumers to continue processing data 
during rebalance. The documentation for the assignor states that "Users should 
prefer this assignor for newer clusters."

We should alter the default consumer configuration for sink tasks to use the 
new {{CooperativeStickyAssignor}}. In order to do this in a 
backwards-compatible fashion that also enables rolling upgrades, this should be 
implemented by changing the {{Worker}} to set the following on the consumer 
configuration created for each sink connector task:
{code:java}
partition.assignment.strategy=org.apache.kafka.clients.consumer.CooperativeStickyAssignor,org.apache.kafka.clients.consumer.RangeAssignor
{code}
This way, consumer groups for sink connectors on Connect clusters in the 
process of being upgraded will continue to use the {{RangeAssignor}} until all 
workers in the cluster have been upgraded, and then will switch over to the new 
{{CooperativeStickyAssignor}} automatically. But, this setting will be 
overwritten by any user-specified {{consumer.partition.assignment.strategy}} 
property in the worker configuration, and by any user-specified 
{{consumer.override.partition.assignment.strategy}} property in a sink 
connector configuration when per-connector client overrides is enabled in the 
worker config with {{connector.client.config.override.policy=ALL}}.

This improvement is viable as far back as -2.3- 2.4, when the 
{{CooperativeStickyAssignor}} was introduced, but given that it is not a bug 
fix, should only be applied to the Connect framework in an upcoming minor 
release.
h3. Manually setting the partition assignment strategy

There is a simple workaround to achieve the same behavior in AK releases 2.4 
and later that don't also include this fix: either set the following in the 
*worker configuration*:
{code:java}
consumer.partition.assignment.strategy=org.apache.kafka.clients.consumer.CooperativeStickyAssignor,org.apache.kafka.clients.consumer.RangeAssignor
{code}
or set the following in *connector configurations* when per-connector client 
overrides is enabled in the worker config with 
{{connector.client.config.override.policy=ALL}}:
{code:java}
consumer.override.partition.assignment.strategy=org.apache.kafka.clients.consumer.CooperativeStickyAssignor,org.apache.kafka.clients.consumer.RangeAssignor
{code}

  was:
Kafka consumers have a pluggable [partition assignment 
interface|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.html]
 that comes with several out-of-the-box implementations including the 
[RangeAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/RangeAssignor.html],
 
[RoundRobinAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/RoundRobinAssignor.html],
 

[jira] [Updated] (KAFKA-12463) Update default consumer partition assignor for sink tasks

2021-03-16 Thread Randall Hauch (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12463?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Randall Hauch updated KAFKA-12463:
--
Description: 
Kafka consumers have a pluggable [partition assignment 
interface|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.html]
 that comes with several out-of-the-box implementations including the 
[RangeAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/RangeAssignor.html],
 
[RoundRobinAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/RoundRobinAssignor.html],
 
[StickyAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/StickyAssignor.html],
 and 
[CooperativeStickyAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/CooperativeStickyAssignor.html].

If no partition assignor is configured with a consumer, the {{RangeAssignor}} 
is used by default. Although there are some benefits to this assignor including 
stability of assignment across generations and simplicity of design, it comes 
with a major drawback: the number of active consumers in a group is limited to 
the number of partitions in the topic(s) with the most partitions. For an 
example of the worst case, in a consumer group where every member is subscribed 
to ten topics that each have one partition, only one member of that group will 
be assigned any topic partitions.

This can end up producing counterintuitive and even frustrating behavior when a 
sink connector is brought up with N tasks to read from some collection of 
topics with a total of N topic partitions, but some tasks end up idling and not 
processing any data.
h3. Proposed Change

[KIP-429|https://cwiki.apache.org/confluence/display/KAFKA/KIP-429%3A+Kafka+Consumer+Incremental+Rebalance+Protocol]
 introduced the {{CooperativeStickyAssignor}}, which seeks to provide a stable 
assignment across generations wherever possible, provide the most even 
assignment possible (taking into account possible differences in subscriptions 
across consumers in the group), and allow consumers to continue processing data 
during rebalance. The documentation for the assignor states that "Users should 
prefer this assignor for newer clusters."

We should alter the default consumer configuration for sink tasks to use the 
new {{CooperativeStickyAssignor}}. In order to do this in a 
backwards-compatible fashion that also enables rolling upgrades, this should be 
implemented by changing the {{Worker}} to set the following on the consumer 
configuration created for each sink connector task:
{code:java}
partition.assignment.strategy=org.apache.kafka.clients.consumer.CooperativeStickyAssignor,org.apache.kafka.clients.consumer.RangeAssignor
{code}
This way, consumer groups for sink connectors on Connect clusters in the 
process of being upgraded will continue to use the {{RangeAssignor}} until all 
workers in the cluster have been upgraded, and then will switch over to the new 
{{CooperativeStickyAssignor}} automatically. But, this setting will be 
overwritten by any user-specified {{consumer.partition.assignment.strategy}} 
property in the worker configuration, and by any user-specified 
{{consumer.override.partition.assignment.strategy}} property in a sink 
connector configuration when per-connector client overrides is enabled in the 
worker config with {{connector.client.config.override.policy=ALL}}.

This improvement is viable as far back as -2.3- 2.4, when the 
{{CooperativeStickyAssignor}} was introduced, but given that it is not a bug 
fix, should only be applied to the Connect framework in an upcoming minor 
release.
h3. Manually setting the partition assignment strategy

There is a simple workaround to achieve the same behavior in AK releases 2.4 
and later that don't also include this fix: either set the following in the 
*worker configuration*:
{code:java}
consumer.partition.assignment.strategy=org.apache.kafka.clients.consumer.CooperativeStickyAssignor,org.apache.kafka.clients.consumer.RangeAssignor
{code}
or set the following in *connector configurations* when per-connector client 
overrides is enabled in the worker config with 
{{connector.client.config.override.policy=ALL}}:
{code:java}
consumer.override.partition.assignment.strategy=org.apache.kafka.clients.consumer.CooperativeStickyAssignor,org.apache.kafka.clients.consumer.RangeAssignor
{code}

  was:
Kafka consumers have a pluggable [partition assignment 
interface|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.html]
 that comes with several out-of-the-box implementations including the 
[RangeAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/RangeAssignor.html],
 
[RoundRobinAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/RoundRobinAssignor.html],
 
[StickyAssignor|https://

[jira] [Updated] (KAFKA-12463) Update default consumer partition assignor for sink tasks

2021-03-16 Thread Randall Hauch (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12463?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Randall Hauch updated KAFKA-12463:
--
Description: 
Kafka consumers have a pluggable [partition assignment 
interface|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.html]
 that comes with several out-of-the-box implementations including the 
[RangeAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/RangeAssignor.html],
 
[RoundRobinAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/RoundRobinAssignor.html],
 
[StickyAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/StickyAssignor.html],
 and 
[CooperativeStickyAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/CooperativeStickyAssignor.html].

If no partition assignor is configured with a consumer, the {{RangeAssignor}} 
is used by default. Although there are some benefits to this assignor including 
stability of assignment across generations and simplicity of design, it comes 
with a major drawback: the number of active consumers in a group is limited to 
the number of partitions in the topic(s) with the most partitions. For an 
example of the worst case, in a consumer group where every member is subscribed 
to ten topics that each have one partition, only one member of that group will 
be assigned any topic partitions.

This can end up producing counterintuitive and even frustrating behavior when a 
sink connector is brought up with N tasks to read from some collection of 
topics with a total of N topic partitions, but some tasks end up idling and not 
processing any data.

h3. Proposed Change
[KIP-429|https://cwiki.apache.org/confluence/display/KAFKA/KIP-429%3A+Kafka+Consumer+Incremental+Rebalance+Protocol]
 introduced the {{CooperativeStickyAssignor}}, which seeks to provide a stable 
assignment across generations wherever possible, provide the most even 
assignment possible (taking into account possible differences in subscriptions 
across consumers in the group), and allow consumers to continue processing data 
during rebalance. The documentation for the assignor states that "Users should 
prefer this assignor for newer clusters."

We should alter the default consumer configuration for sink tasks to use the 
new {{CooperativeStickyAssignor}}. In order to do this in a 
backwards-compatible fashion that also enables rolling upgrades, this should be 
implemented by changing the {{Worker}} to set the following on the consumer 
configuration created for each sink connector task:
{code}
partition.assignment.strategy=org.apache.kafka.clients.consumer.CooperativeStickyAssignor,org.apache.kafka.clients.consumer.RangeAssignor
{code}
This way, consumer groups for sink connectors on Connect clusters in the 
process of being upgraded will continue to use the {{RangeAssignor}} until all 
workers in the cluster have been upgraded, and then will switch over to the new 
{{CooperativeStickyAssignor}} automatically. But, this setting will be 
overwritten by any user-specified {{consumer.partition.assignment.strategy}} 
property in the worker configuration, and by any user-specified 
{{consumer.override.partition.assignment.strategy}} property in a sink 
connector configuration when per-connector client overrides is enabled in the 
worker config with {{connector.client.config.override.policy=ALL}}.

This improvement is viable as far back as -2.3- 2.4, when the 
{{CooperativeStickyAssignor}} was introduced, but given that it is not a bug 
fix, should only be applied to the Connect framework in an upcoming minor 
release. 

h3. Manually setting the partition assignment strategy
There is a simple workaround to achieve the same behavior in AK releases that 
don't include this fix: either set the following in the *worker configuration*:
{code}
consumer.partition.assignment.strategy=org.apache.kafka.clients.consumer.CooperativeStickyAssignor,org.apache.kafka.clients.consumer.RangeAssignor
{code}

or set the following in *connector configurations* when per-connector client 
overrides is enabled in the worker config with 
{{connector.client.config.override.policy=ALL}}:
{code}
consumer.override.partition.assignment.strategy=org.apache.kafka.clients.consumer.CooperativeStickyAssignor,org.apache.kafka.clients.consumer.RangeAssignor
{code}

  was:
Kafka consumers have a pluggable [partition assignment 
interface|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.html]
 that comes with several out-of-the-box implementations including the 
[RangeAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/RangeAssignor.html],
 
[RoundRobinAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/RoundRobinAssignor.html],
 
[StickyAssignor|https://kafka.apache.org/27/javadoc/org/a

[jira] [Updated] (KAFKA-12463) Update default consumer partition assignor for sink tasks

2021-03-16 Thread Randall Hauch (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12463?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Randall Hauch updated KAFKA-12463:
--
Description: 
Kafka consumers have a pluggable [partition assignment 
interface|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.html]
 that comes with several out-of-the-box implementations including the 
[RangeAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/RangeAssignor.html],
 
[RoundRobinAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/RoundRobinAssignor.html],
 
[StickyAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/StickyAssignor.html],
 and 
[CooperativeStickyAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/CooperativeStickyAssignor.html].

If no partition assignor is configured with a consumer, the {{RangeAssignor}} 
is used by default. Although there are some benefits to this assignor including 
stability of assignment across generations and simplicity of design, it comes 
with a major drawback: the number of active consumers in a group is limited to 
the number of partitions in the topic(s) with the most partitions. For an 
example of the worst case, in a consumer group where every member is subscribed 
to ten topics that each have one partition, only one member of that group will 
be assigned any topic partitions.

This can end up producing counterintuitive and even frustrating behavior when a 
sink connector is brought up with N tasks to read from some collection of 
topics with a total of N topic partitions, but some tasks end up idling and not 
processing any data.

[KIP-429|https://cwiki.apache.org/confluence/display/KAFKA/KIP-429%3A+Kafka+Consumer+Incremental+Rebalance+Protocol]
 introduced the {{CooperativeStickyAssignor}}, which seeks to provide a stable 
assignment across generations wherever possible, provide the most even 
assignment possible (taking into account possible differences in subscriptions 
across consumers in the group), and allow consumers to continue processing data 
during rebalance. The documentation for the assignor states that "Users should 
prefer this assignor for newer clusters."

h3. Proposed Change
We should alter the default consumer configuration for sink tasks to use the 
new {{CooperativeStickyAssignor}}. In order to do this in a 
backwards-compatible fashion that also enables rolling upgrades, this should be 
implemented by changing the {{Worker}} to set the following on the consumer 
configuration created for each sink connector task:
{code}
partition.assignment.strategy=org.apache.kafka.clients.consumer.CooperativeStickyAssignor,org.apache.kafka.clients.consumer.RangeAssignor
{code}
This way, consumer groups for sink connectors on Connect clusters in the 
process of being upgraded will continue to use the {{RangeAssignor}} until all 
workers in the cluster have been upgraded, and then will switch over to the new 
{{CooperativeStickyAssignor}} automatically. But, this setting will be 
overwritten by any user-specified {{consumer.partition.assignment.strategy}} 
property in the worker configuration, and by any user-specified 
{{consumer.override.partition.assignment.strategy}} property in a sink 
connector configuration when per-connector client overrides is enabled in the 
worker config with {{connector.client.config.override.policy=ALL}}.

This improvement is viable as far back as -2.3- 2.4, when the 
{{CooperativeStickyAssignor}} was introduced, but given that it is not a bug 
fix, should only be applied to the Connect framework in an upcoming minor 
release. 

h3. Manually setting the partition assignment strategy
There is a simple workaround to achieve the same behavior in AK releases that 
don't include this fix: either set the following in the *worker configuration*:
{code}
consumer.partition.assignment.strategy=org.apache.kafka.clients.consumer.CooperativeStickyAssignor,org.apache.kafka.clients.consumer.RangeAssignor
{code}

or set the following in *connector configurations* when per-connector client 
overrides is enabled in the worker config with 
{{connector.client.config.override.policy=ALL}}:
{code}
consumer.override.partition.assignment.strategy=org.apache.kafka.clients.consumer.CooperativeStickyAssignor,org.apache.kafka.clients.consumer.RangeAssignor
{code}

  was:
Kafka consumers have a pluggable [partition assignment 
interface|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.html]
 that comes with several out-of-the-box implementations including the 
[RangeAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/RangeAssignor.html],
 
[RoundRobinAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/RoundRobinAssignor.html],
 
[StickyAssignor|https://kafka.apache.org/27/javadoc/org/a

[jira] [Updated] (KAFKA-12463) Update default consumer partition assignor for sink tasks

2021-03-14 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12463?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton updated KAFKA-12463:
--
Description: 
Kafka consumers have a pluggable [partition assignment 
interface|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.html]
 that comes with several out-of-the-box implementations including the 
[RangeAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/RangeAssignor.html],
 
[RoundRobinAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/RoundRobinAssignor.html],
 
[StickyAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/StickyAssignor.html],
 and 
[CooperativeStickyAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/CooperativeStickyAssignor.html].

If no partition assignor is configured with a consumer, the {{RangeAssignor}} 
is used by default. Although there are some benefits to this assignor including 
stability of assignment across generations and simplicity of design, it comes 
with a major drawback: the number of active consumers in a group is limited to 
the number of partitions in the topic(s) with the most partitions. For an 
example of the worst case, in a consumer group where every member is subscribed 
to ten topics that each have one partition, only one member of that group will 
be assigned any topic partitions.

This can end up producing counterintuitive and even frustrating behavior when a 
sink connector is brought up with N tasks to read from some collection of 
topics with a total of N topic partitions, but some tasks end up idling and not 
processing any data.

 

[KIP-429|https://cwiki.apache.org/confluence/display/KAFKA/KIP-429%3A+Kafka+Consumer+Incremental+Rebalance+Protocol]
 introduced the {{CooperativeStickyAssignor}}, which seeks to provide a stable 
assignment across generations wherever possible, provide the most even 
assignment possible (taking into account possible differences in subscriptions 
across consumers in the group), and allow consumers to continue processing data 
during rebalance. The documentation for the assignor states that "Users should 
prefer this assignor for newer clusters."

We should alter the default consumer configuration for sink tasks to use the 
new {{CooperativeStickyAssignor}}. In order to do this in a 
backwards-compatible fashion that also enables rolling upgrades, this should be 
implemented by setting the {{partition.assignment.strategy}} property of sink 
task consumers to the list 
{{org.apache.kafka.clients.consumer.CooperativeStickyAssignor, 
org.apache.kafka.clients.consumer.RangeAssignor}} when no worker-level or 
connector-level override is present.

This way, consumer groups for sink connectors on Connect clusters in the 
process of being upgraded will continue to use the {{RangeAssignor}} until all 
workers in the cluster have been upgraded, and then will switch over to the new 
{{CooperativeStickyAssignor}} automatically.

 

This improvement is viable as far back as -2.3- 2.4, when the 
{{CooperativeStickyAssignor}} was introduced, but given that it is not a bug 
fix, should only be applied to the Connect framework in an upcoming minor 
release. This does not preclude users from following the steps outlined here to 
improve sink connector behavior on existing clusters by modifying their worker 
configs to use {{consumer.partition.assignment.strategy =}} 
{{org.apache.kafka.clients.consumer.CooperativeStickyAssignor, 
org.apache.kafka.clients.consumer.RangeAssignor}}, or doing the same on a 
per-connector basis using the 
{{consumer.override.partition.assignment.strategy}} property.

  was:
Kafka consumers have a pluggable [partition assignment 
interface|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.html]
 that comes with several out-of-the-box implementations including the 
[RangeAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/RangeAssignor.html],
 
[RoundRobinAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/RoundRobinAssignor.html],
 
[StickyAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/StickyAssignor.html],
 and 
[CooperativeStickyAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/CooperativeStickyAssignor.html].

If no partition assignor is configured with a consumer, the {{RangeAssignor}} 
is used by default. Although there are some benefits to this assignor including 
stability of assignment across generations and simplicity of design, it comes 
with a major drawback: the number of active consumers in a group is limited to 
the number of partitions in the topic(s) with the most partitions. For an 
example of the worst case, in a consumer group where every member is subscribed 
to ten topics t