[ 
https://issues.apache.org/jira/browse/KAFKA-12463?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17302981#comment-17302981
 ] 

Chris Egerton edited comment on KAFKA-12463 at 3/17/21, 1:34 AM:
-----------------------------------------------------------------

Fun fact--Connect does not like cooperative consumers. See 
https://issues.apache.org/jira/browse/KAFKA-12487 for details.

Until/unless that's addressed, we can't change the default partitioner for 
Connect to the {{CooperativeStickyAssignor}}.

 

[~rhauch] thanks for the rewrite, I hope that makes things easier to read. 
Responses inline:
{quote}Specifically, isn't it true that after the Connect worker config is 
changed, one rolling restart of the Connect workers will not immediately use 
the cooperative assignor (since the last worker to restart will only use the 
range assignor (the default), and that the cooperative assignor will only be 
used after a second rolling restart.
{quote}
That's true if the second rolling restart includes a change to the partition 
assignment strategy for its consumers to only use the 
{{CooperativeStickyAssignor}} and remove the {{RangeAssignor}} from the list, 
yes. Given that and the recently-discovered KAFKA-12487 , I don't even think 
we're at the point of updating the description yet to mention an upgrade 
process that would accommodate the {{CooperativeStickyAssignor}} without having 
a warning notice preceding any hypothetical designs we might implement once 
both of these pain points are addressed. I've updated the description 
accordingly, feel free to make any edits as long as we don't actually instruct 
users how to configure their workers with the {{CooperativeStickyAssignor}} as 
that will lead to bad worker behavior.
{quote}In both standalone and distributed, Connect only updates the task 
configs if and only if the task configs changed. If a user *only* changes the 
{{consumer.override.partition.assignment.strategy}} property in a connector 
config, then the task configs will not be changed and the tasks will not be 
restarted.
{quote}
Ah yes, KAFKA-9228. When I initially discovered that one I was a little alarmed 
but it turns out that the overwhelming majority of connectors out there are 
unaffected just by quirk of how people tend to implement 
{{Connector::taskConfigs}}. The only cases I've been able to find where this 
bug comes up are in the file stream connectors. If we believe this is likely to 
affect other connectors, I personally think we should be addressing that bug 
instead of working around it or documenting it as a potential gotchas.
{quote}If Connect does attempt to restart the connector's tasks, the herder 
does not wait for the tasks to stop before starting any of them. This means 
that it may take several such connector & task restarts before the cooperative 
partition assignment strategy will take effect.
{quote}
That's a fair point, and it applies to any change of partition assignment 
strategy and not just specifically moving from an eager to a cooperative one. 
This becomes especially likely if a task isn't able to respond to a shutdown 
request within the graceful shutdown period (which defaults to five seconds). 
The workaround here is to enable both partition assignment strategies for the 
consumer with a preference for the desired strategy; that way, the desired 
strategy will take effect as soon as every consumer in the group has been 
updated, and nobody will break beforehand. I'll update the workaround section 
in the description to include that info.

 

I'd also just like to point out that the goal here is to improve the 
out-of-the-box behavior of Connect for users; although workarounds are nice to 
have, the goal here shouldn't be to focus on documenting them but instead, to 
make them obsolete. If we decide not to improve the default behavior of Connect 
then we can document this somewhere else that's a little more visible for users 
as opposed to developers. And there aren't any technical limitations preventing 
us from choosing a non-cooperative assignor right now and running with it, so 
if worst comes to worst, we might consider switching to, e.g., the 
{{RoundRobinAssignor}} and calling that good enough.


was (Author: chrisegerton):
Fun fact--Connect does not like cooperative consumers. See 
https://issues.apache.org/jira/browse/KAFKA-12487 for details.

Until/unless that's addressed, we can't change the default partitioner for 
Connect to the {{CooperativeStickyAssignor}}.

 

[~rhauch] thanks for the rewrite, I hope that makes things easier to read. 
Responses inline:
{quote}Specifically, isn't it true that after the Connect worker config is 
changed, one rolling restart of the Connect workers will not immediately use 
the cooperative assignor (since the last worker to restart will only use the 
range assignor (the default), and that the cooperative assignor will only be 
used after a second rolling restart.
{quote}
That's true if the second rolling restart includes a change to the partition 
assignment strategy for its consumers to only use the 
{{CooperativeStickyAssignor}} and remove the {{RangeAssignor}} from the list, 
yes. Given that and the recently-discovered KAFKA-12487 , I don't even think 
we're at the point of updating the description yet to mention an upgrade 
process that would accommodate the {{CooperativeStickyAssignor}} without having 
a warning notice preceding any hypothetical designs we might implement once 
both of these pain points are addressed. I've updated the description 
accordingly, feel free to make any edits as long as we don't actually instruct 
users how to configure their workers with the {{CooperativeStickyAssignor}} as 
that will lead to bad worker behavior.
{quote}In both standalone and distributed, Connect only updates the task 
configs if and only if the task configs changed. If a user *only* changes the 
{{consumer.override.partition.assignment.strategy}} property in a connector 
config, then the task configs will not be changed and the tasks will not be 
restarted.
{quote}
Ah yes, KAFKA-9228. When I initially discovered that one I was a little alarmed 
but it turns out that the overwhelming majority of connectors out there are 
unaffected just by quirk of how people tend to implement 
{{Connector::taskConfigs}}. The only cases I've been able to find where this 
bug comes up are in the file stream connectors. If we believe this is likely to 
affect other connectors, I personally think we should be addressing that bug 
instead of working around it or documenting it as a potential gotchas.
{quote}If Connect does attempt to restart the connector's tasks, the herder 
does not wait for the tasks to stop before starting any of them. This means 
that it may take several such connector & task restarts before the cooperative 
partition assignment strategy will take effect.
{quote}
That's a fair point, and it applies to any change of partition assignment 
strategy and not just specifically moving from an eager to a cooperative one. 
This becomes especially likely if a task isn't able to respond to a shutdown 
request within the graceful shutdown period (which defaults to five seconds). 
The workaround here is to enable both partition assignment strategies for the 
consumer with a preference for the desired strategy; that way, the desired 
strategy will take effect as soon as every consumer in the group has been 
updated, and nobody will break beforehand. I'll update the workaround section 
in the description to include that info.

 

I'd also just like to point out that the goal here is to improve the 
out-of-the-box behavior of Connect for users; although workarounds are nice to 
have, the goal here shouldn't be to focus on documenting them but instead, to 
make them obsolete. If we decide not to improve the default behavior of Connect 
then we can document this somewhere else that's a little more visible for users 
as opposed to developers.

> Update default consumer partition assignor for sink tasks
> ---------------------------------------------------------
>
>                 Key: KAFKA-12463
>                 URL: https://issues.apache.org/jira/browse/KAFKA-12463
>             Project: Kafka
>          Issue Type: Improvement
>          Components: KafkaConnect
>            Reporter: Chris Egerton
>            Assignee: Chris Egerton
>            Priority: Major
>
> Kafka consumers have a pluggable [partition assignment 
> interface|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.html]
>  that comes with several out-of-the-box implementations including the 
> [RangeAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/RangeAssignor.html],
>  
> [RoundRobinAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/RoundRobinAssignor.html],
>  
> [StickyAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/StickyAssignor.html],
>  and 
> [CooperativeStickyAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/CooperativeStickyAssignor.html].
> If no partition assignor is configured with a consumer, the {{RangeAssignor}} 
> is used by default. Although there are some benefits to this assignor 
> including stability of assignment across generations and simplicity of 
> design, it comes with a major drawback: the number of active consumers in a 
> group is limited to the number of partitions in the topic(s) with the most 
> partitions. For an example of the worst case, in a consumer group where every 
> member is subscribed to ten topics that each have one partition, only one 
> member of that group will be assigned any topic partitions.
> This can end up producing counterintuitive and even frustrating behavior when 
> a sink connector is brought up with N tasks to read from some collection of 
> topics with a total of N topic partitions, but some tasks end up idling and 
> not processing any data.
> h3. Proposed Change
> *NOTE: Until/unless KAFKA-12477 is addressed, the approach outlined below 
> will not work as consumers will still perform eager rebalancing as long as at 
> least one of the partition assignors they are configured with does not 
> support cooperative rebalancing. KAFKA-12487 should also be addressed before 
> configuring any Connect worker to use the {{CooperativeStickyAssignor}} for 
> any sink connectors.*
> [KIP-429|https://cwiki.apache.org/confluence/display/KAFKA/KIP-429%3A+Kafka+Consumer+Incremental+Rebalance+Protocol]
>  introduced the {{CooperativeStickyAssignor}}, which seeks to provide a 
> stable assignment across generations wherever possible, provide the most even 
> assignment possible (taking into account possible differences in 
> subscriptions across consumers in the group), and allow consumers to continue 
> processing data during rebalance. The documentation for the assignor states 
> that "Users should prefer this assignor for newer clusters."
> We should alter the default consumer configuration for sink tasks to use the 
> new {{CooperativeStickyAssignor}}. In order to do this in a 
> backwards-compatible fashion that also enables rolling upgrades, this should 
> be implemented by changing the {{Worker}} to set the following on the 
> consumer configuration created for each sink connector task:
> {code:java}
> partition.assignment.strategy=org.apache.kafka.clients.consumer.CooperativeStickyAssignor,org.apache.kafka.clients.consumer.RangeAssignor
> {code}
> This way, consumer groups for sink connectors on Connect clusters in the 
> process of being upgraded will continue to use the {{RangeAssignor}} until 
> all workers in the cluster have been upgraded, and then will switch over to 
> the new {{CooperativeStickyAssignor}} automatically. But, this setting will 
> be overwritten by any user-specified 
> {{consumer.partition.assignment.strategy}} property in the worker 
> configuration, and by any user-specified 
> {{consumer.override.partition.assignment.strategy}} property in a sink 
> connector configuration when per-connector client overrides is enabled in the 
> worker config with {{connector.client.config.override.policy=ALL}}.
> This improvement is viable as far back as -2.3- 2.4, when the 
> {{CooperativeStickyAssignor}} was introduced, but given that it is not a bug 
> fix, should only be applied to the Connect framework in an upcoming minor 
> release.
> h3. Manually setting the partition assignment strategy
> There is a simple workaround to achieve the same behavior in AK releases 2.4 
> and later that don't also include this improvement: either set a value for 
> the {{consumer.partition.assignment.strategy}} property in the *worker 
> configuration, or* set a value for the 
> {{consumer.override.partition.assignment.strategy}} property in one or more 
> *connector configurations* when per-connector client overrides is enabled in 
> the worker config with {{connector.client.config.override.policy=ALL}}.
> In order to avoid task failures while the connector is being reconfigured, it 
> is highly recommended that the consumer be configured with a list of both the 
> new and the current partition assignment strategies, instead of just the new 
> partition assignment strategy. For example, to update a connector configured 
> to use the {{RangeAssignor}} strategy to use the {{RoundRobinAssignor}} 
> strategy instead, add the following to the connector configuration:
> {code:java}
> "consumer.override.partition.assignment.strategy": 
> "org.apache.kafka.clients.consumer.RoundRobinAssignor, 
> org.apache.kafka.clients.consumer.RangeAssignor"{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to