[ https://issues.apache.org/jira/browse/KAFKA-12463?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17301321#comment-17301321 ]
A. Sophie Blee-Goldman commented on KAFKA-12463: ------------------------------------------------ [~ChrisEgerton] just fyi, the CooperativeStickyAssignor was introduced in 2.4, not 2.3. Also, that upgrade protocol sounds correct but it should be covered in the KIP-429 document – did you check out the steps listed for Consumer upgrades in the section called "Compatibility and Upgrade Path"? [Link|https://cwiki.apache.org/confluence/display/KAFKA/KIP-429%3A+Kafka+Consumer+Incremental+Rebalance+Protocol#KIP429:KafkaConsumerIncrementalRebalanceProtocol-Consumer] If you had a hard time finding that info or found the instructions unclear, please lmk so I can try to improve the docs. > Update default consumer partition assignor for sink tasks > --------------------------------------------------------- > > Key: KAFKA-12463 > URL: https://issues.apache.org/jira/browse/KAFKA-12463 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect > Reporter: Chris Egerton > Assignee: Chris Egerton > Priority: Major > > Kafka consumers have a pluggable [partition assignment > interface|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.html] > that comes with several out-of-the-box implementations including the > [RangeAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/RangeAssignor.html], > > [RoundRobinAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/RoundRobinAssignor.html], > > [StickyAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/StickyAssignor.html], > and > [CooperativeStickyAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/CooperativeStickyAssignor.html]. > If no partition assignor is configured with a consumer, the {{RangeAssignor}} > is used by default. Although there are some benefits to this assignor > including stability of assignment across generations and simplicity of > design, it comes with a major drawback: the number of active consumers in a > group is limited to the number of partitions in the topic(s) with the most > partitions. For an example of the worst case, in a consumer group where every > member is subscribed to ten topics that each have one partition, only one > member of that group will be assigned any topic partitions. > This can end up producing counterintuitive and even frustrating behavior when > a sink connector is brought up with N tasks to read from some collection of > topics with a total of N topic partitions, but some tasks end up idling and > not processing any data. > > [KIP-429|https://cwiki.apache.org/confluence/display/KAFKA/KIP-429%3A+Kafka+Consumer+Incremental+Rebalance+Protocol] > introduced the {{CooperativeStickyAssignor}}, which seeks to provide a > stable assignment across generations wherever possible, provide the most even > assignment possible (taking into account possible differences in > subscriptions across consumers in the group), and allow consumers to continue > processing data during rebalance. The documentation for the assignor states > that "Users should prefer this assignor for newer clusters." > We should alter the default consumer configuration for sink tasks to use the > new {{CooperativeStickyAssignor}}. In order to do this in a > backwards-compatible fashion that also enables rolling upgrades, this should > be implemented by setting the {{partition.assignment.strategy}} property of > sink task consumers to the list > {{org.apache.kafka.clients.consumer.CooperativeStickyAssignor, > org.apache.kafka.clients.consumer.RangeAssignor}} when no worker-level or > connector-level override is present. > This way, consumer groups for sink connectors on Connect clusters in the > process of being upgraded will continue to use the {{RangeAssignor}} until > all workers in the cluster have been upgraded, and then will switch over to > the new {{CooperativeStickyAssignor}} automatically. > > This improvement is viable as far back as 2.3, when the > {{CooperativeStickyAssignor}} was introduced, but given that it is not a bug > fix, should only be applied to the Connect framework in an upcoming minor > release. This does not preclude users from following the steps outlined here > to improve sink connector behavior on existing clusters by modifying their > worker configs to use {{consumer.partition.assignment.strategy =}} > {{org.apache.kafka.clients.consumer.CooperativeStickyAssignor, > org.apache.kafka.clients.consumer.RangeAssignor}}, or doing the same on a > per-connector basis using the > {{consumer.override.partition.assignment.strategy}} property. -- This message was sent by Atlassian Jira (v8.3.4#803005)