[ https://issues.apache.org/jira/browse/KAFKA-12463?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17302680#comment-17302680 ]
Randall Hauch edited comment on KAFKA-12463 at 3/16/21, 4:39 PM: ----------------------------------------------------------------- [~ChrisEgerton], I editing the description of this issue a bit to clarify the proposal versus the workaround, and formatted the config settings to make them more prominent. Given [~ableegoldman]'s comments about a double rolling upgrade being required to ensure the consumer group uses the cooperative assignor, WDYT about updating this issue's description to talk about rolling upgrades. Specifically, isn't it true that after the Connect worker config is changed, one rolling restart of the Connect workers will not immediately use the cooperative assignor (since the last worker to restart will only use the range assignor (the default), and that the cooperative assignor will only be used after a second rolling restart. Interestingly, IIUC when a connector configuration is changed to specify the partition assignors in AK 2.4 or later, there are a few challenges to make this work: # In both standalone and distributed, Connect only updates the task configs if and only if the task configs changed. If a user *only* changes the {{consumer.override.partition.assignment.strategy}} property in a connector config, then the task configs will not be changed and the tasks will not be restarted. # If Connect does attempt to restart the connector's tasks, the herder does not wait for the tasks to stop before starting any of them. This means that it may take several such connector & task restarts before the cooperative partition assignment strategy will take effect. Seems like we should at least mention this in the workaround above. WDYT? Should we also recommend setting the partition assignment strategy in the worker config versus connector configs? was (Author: rhauch): [~ChrisEgerton], I editing the description of this issue a bit to clarify the proposal versus the workaround, and formatted the config settings to make them more prominent. Given [~ableegoldman]'s comments about a double rolling upgrade being required to ensure the consumer group uses the cooperative assignor, WDYT about updating this issue's description to talk about rolling upgrades. Specifically, isn't it true that after the Connect worker config is changed, one rolling restart of the Connect workers will not immediately use the cooperative assignor (since the last worker to restart will only use the range assignor (the default), and that the cooperative assignor will only be used after a second rolling restart. Interestingly, IIUC when a connector configuration is changed to specify the partition assignors in AK 2.4 or later, there are a few challenges to make this work: # In both standalone and distributed, Connect only updates the task configs if and only if the task configs changed. If a user *only* changes the {{consumer.override.partition.assignment.strategy}} property in a connector config, then the task configs will not be changed and the tasks will not be restarted. # If Connect does attempt to restart the connector's tasks, the herder does not wait for the tasks to stop before starting any of them. This means that it may take several such connector & task restarts before the cooperative partition assignment strategy will take effect. Seems like we should at least mention this in the workaround above. WDYT? > Update default consumer partition assignor for sink tasks > --------------------------------------------------------- > > Key: KAFKA-12463 > URL: https://issues.apache.org/jira/browse/KAFKA-12463 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect > Reporter: Chris Egerton > Assignee: Chris Egerton > Priority: Major > > Kafka consumers have a pluggable [partition assignment > interface|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.html] > that comes with several out-of-the-box implementations including the > [RangeAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/RangeAssignor.html], > > [RoundRobinAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/RoundRobinAssignor.html], > > [StickyAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/StickyAssignor.html], > and > [CooperativeStickyAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/CooperativeStickyAssignor.html]. > If no partition assignor is configured with a consumer, the {{RangeAssignor}} > is used by default. Although there are some benefits to this assignor > including stability of assignment across generations and simplicity of > design, it comes with a major drawback: the number of active consumers in a > group is limited to the number of partitions in the topic(s) with the most > partitions. For an example of the worst case, in a consumer group where every > member is subscribed to ten topics that each have one partition, only one > member of that group will be assigned any topic partitions. > This can end up producing counterintuitive and even frustrating behavior when > a sink connector is brought up with N tasks to read from some collection of > topics with a total of N topic partitions, but some tasks end up idling and > not processing any data. > h3. Proposed Change > [KIP-429|https://cwiki.apache.org/confluence/display/KAFKA/KIP-429%3A+Kafka+Consumer+Incremental+Rebalance+Protocol] > introduced the {{CooperativeStickyAssignor}}, which seeks to provide a > stable assignment across generations wherever possible, provide the most even > assignment possible (taking into account possible differences in > subscriptions across consumers in the group), and allow consumers to continue > processing data during rebalance. The documentation for the assignor states > that "Users should prefer this assignor for newer clusters." > We should alter the default consumer configuration for sink tasks to use the > new {{CooperativeStickyAssignor}}. In order to do this in a > backwards-compatible fashion that also enables rolling upgrades, this should > be implemented by changing the {{Worker}} to set the following on the > consumer configuration created for each sink connector task: > {code:java} > partition.assignment.strategy=org.apache.kafka.clients.consumer.CooperativeStickyAssignor,org.apache.kafka.clients.consumer.RangeAssignor > {code} > This way, consumer groups for sink connectors on Connect clusters in the > process of being upgraded will continue to use the {{RangeAssignor}} until > all workers in the cluster have been upgraded, and then will switch over to > the new {{CooperativeStickyAssignor}} automatically. But, this setting will > be overwritten by any user-specified > {{consumer.partition.assignment.strategy}} property in the worker > configuration, and by any user-specified > {{consumer.override.partition.assignment.strategy}} property in a sink > connector configuration when per-connector client overrides is enabled in the > worker config with {{connector.client.config.override.policy=ALL}}. > This improvement is viable as far back as -2.3- 2.4, when the > {{CooperativeStickyAssignor}} was introduced, but given that it is not a bug > fix, should only be applied to the Connect framework in an upcoming minor > release. > h3. Manually setting the partition assignment strategy > There is a simple workaround to achieve the same behavior in AK releases 2.4 > and later that don't also include this fix: either set the following in the > *worker configuration*: > {code:java} > consumer.partition.assignment.strategy=org.apache.kafka.clients.consumer.CooperativeStickyAssignor,org.apache.kafka.clients.consumer.RangeAssignor > {code} > or set the following in *connector configurations* when per-connector client > overrides is enabled in the worker config with > {{connector.client.config.override.policy=ALL}}: > {code:java} > consumer.override.partition.assignment.strategy=org.apache.kafka.clients.consumer.CooperativeStickyAssignor,org.apache.kafka.clients.consumer.RangeAssignor > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)