[ 
https://issues.apache.org/jira/browse/KAFKA-12463?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17304519#comment-17304519
 ] 

Chris Egerton commented on KAFKA-12463:
---------------------------------------

Thanks Randall--
{quote}I agree with you that we should fix the behavior. But the fix will 
appear only in certain releases, and not all users will be able to upgrade to 
those releases to get the fix. So, documenting the simple workaround will help 
users in those situations. I suggested documenting the workaround here to help 
any users that do stumble upon this issue when searching for a potential fix, 
regardless of whether the workaround is also documented elsewhere.
{quote}
That's an interesting take... I guess the difference I perceive here is that 
this is a proposed improvement, not really a fix. I don't know if any users are 
going to stumble onto this after something breaks since it doesn't address 
anything breaking to begin with. Either way, I guess we can try to iron out the 
workaround a little more here (will do now) but hopefully we can put something 
in the public-facing docs for Connect (maybe as part of upgrade notes if we 
change the default consumer partition assignment strategy) in addition to that; 
seems like that might get more of the target audience here.

 

With regards to steps forward and your question, [~ableegoldman], I wasn't 
certain one way or the other about round robin vs cooperative sticky 
assignment. I had a few thoughts:
 * When the set of task configurations changes, the advantages of stickiness or 
the cooperative protocol are basically irrelevant since each task and its 
accompanying consumer is brought down and a new one is brought up in its place.
 * In a completely stable world where consumer assignment never changes, round 
robin would be ideal (out of the ones available right now) as it'd guarantee 
as-even-as-possible spread of partitions within the same topic across tasks.
 * Otherwise, the only times the cooperative protocol might come into play are:
 ** When a consumer subscription is updated (which would cause a consumer 
rebalance but keep all sink tasks running)
 ** When a task is started or shut down without the connector being 
reconfigured (which may happen when a single task fails, a failed task is 
restarted, a new worker joins the group, or an existing worker leaves the group)
 ** When the consumer for a task falls out of the group (likely because the 
task is taking too long to process data provided to it by the framework).
 * Under most of these scenarios, stickiness would provide no benefit as any 
time a consumer is created, a new task is brought up in its place. The only 
exception is an update to a consumer subscription, but even that would require 
some changes to how Connect invokes 
[SinkTask::open|https://kafka.apache.org/27/javadoc/org/apache/kafka/connect/sink/SinkTask.html#open-java.util.Collection-]
 and 
[SinkTask::close|https://kafka.apache.org/27/javadoc/org/apache/kafka/connect/sink/SinkTask.html#close-java.util.Collection-]
 to basically fake cooperative rebalancing in the way that's proposed in the 
[StickyAssignor 
docs|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/StickyAssignor.html]
 under the "Impact on {{ConsumerRebalanceListener}}" section.

The benefits of the sticky assignor seem pretty slim, and would require some 
one-off tooling from Connect to basically re-implement the cooperative 
protocol. This is why I'm personally not in favor of it, but would love to 
learn more if there's something I'm missing here.

So with all that in mind, we can transform the question into whether it's 
important to favor any of the scenarios outlined above where cooperative 
rebalancing might be of some benefit, and if not, opt to use the round robin 
assignor. There's one that comes to mind that I think might be worth 
considering, and count in favor of using the cooperative sticky assignor: if 
there's any kind of tooling in place that restarts failed tasks automatically, 
there will be significant consumer churn as consumers may rapidly fall out of 
and join the group.

I think this scenario is going to become more and more common as adoption of 
Connect increases and both it and the tooling around it mature, and as a 
result, I'm gently in favor of trying to use the {{CooperativeStickyAssignor}} 
now, or at least, when it becomes possible in the Connect framework once work 
on KAFKA-12477 and KAFKA-12487 completes.

I raised this point on the PR but it bears repeating here: we might want to 
reason through this carefully since we only get one shot to do this kind of 
automated upgrade before the next one gets more complicated (the list of 
assignors will grow from one to two this time around; it'll either have to grow 
from two to three the next, or we'll have to risk breaking changes for users 
who skip an upgrade step). I raised the round robin assignor as an option in 
case the cooperative sticky assignor turns out to be impossible or at least 
infeasible, and it's an option, but I'm optimistic that we can do things right 
the first time around and work quickly enough to get the cooperative sticky 
assignor in in time.

 

TL;DR:
 # Cooperative sticky assignor seems best because it'll make sink connectors 
more resilient to failures, especially when automated tooling is available to 
restart failed tasks.
 # It may be possible to do this smoothly across one release if we can take 
care of KAFKA-12477, which is being actively worked on at the moment.
 # It's better to wait a little bit longer to upgrade to a new default assignor 
since we can't do this very often. And we should be fairly certain that 
whatever assignor we opt for is going to be the best one for a wide range of 
users and do the least harm.

> Update default consumer partition assignor for sink tasks
> ---------------------------------------------------------
>
>                 Key: KAFKA-12463
>                 URL: https://issues.apache.org/jira/browse/KAFKA-12463
>             Project: Kafka
>          Issue Type: Improvement
>          Components: KafkaConnect
>            Reporter: Chris Egerton
>            Assignee: Chris Egerton
>            Priority: Major
>
> Kafka consumers have a pluggable [partition assignment 
> interface|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.html]
>  that comes with several out-of-the-box implementations including the 
> [RangeAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/RangeAssignor.html],
>  
> [RoundRobinAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/RoundRobinAssignor.html],
>  
> [StickyAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/StickyAssignor.html],
>  and 
> [CooperativeStickyAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/CooperativeStickyAssignor.html].
> If no partition assignor is configured with a consumer, the {{RangeAssignor}} 
> is used by default. Although there are some benefits to this assignor 
> including stability of assignment across generations and simplicity of 
> design, it comes with a major drawback: the number of active consumers in a 
> group is limited to the number of partitions in the topic(s) with the most 
> partitions. For an example of the worst case, in a consumer group where every 
> member is subscribed to ten topics that each have one partition, only one 
> member of that group will be assigned any topic partitions.
> This can end up producing counterintuitive and even frustrating behavior when 
> a sink connector is brought up with N tasks to read from some collection of 
> topics with a total of N topic partitions, but some tasks end up idling and 
> not processing any data.
> h3. Proposed Change
> *NOTE: Until/unless KAFKA-12477 is addressed, the approach outlined below 
> will not work as consumers will still perform eager rebalancing as long as at 
> least one of the partition assignors they are configured with does not 
> support cooperative rebalancing. KAFKA-12487 should also be addressed before 
> configuring any Connect worker to use the {{CooperativeStickyAssignor}} for 
> any sink connectors.*
> [KIP-429|https://cwiki.apache.org/confluence/display/KAFKA/KIP-429%3A+Kafka+Consumer+Incremental+Rebalance+Protocol]
>  introduced the {{CooperativeStickyAssignor}}, which seeks to provide a 
> stable assignment across generations wherever possible, provide the most even 
> assignment possible (taking into account possible differences in 
> subscriptions across consumers in the group), and allow consumers to continue 
> processing data during rebalance. The documentation for the assignor states 
> that "Users should prefer this assignor for newer clusters."
> We should alter the default consumer configuration for sink tasks to use the 
> new {{CooperativeStickyAssignor}}. In order to do this in a 
> backwards-compatible fashion that also enables rolling upgrades, this should 
> be implemented by changing the {{Worker}} to set the following on the 
> consumer configuration created for each sink connector task:
> {code:java}
> partition.assignment.strategy=org.apache.kafka.clients.consumer.CooperativeStickyAssignor,org.apache.kafka.clients.consumer.RangeAssignor
> {code}
> This way, consumer groups for sink connectors on Connect clusters in the 
> process of being upgraded will continue to use the {{RangeAssignor}} until 
> all workers in the cluster have been upgraded, and then will switch over to 
> the new {{CooperativeStickyAssignor}} automatically. But, this setting will 
> be overwritten by any user-specified 
> {{consumer.partition.assignment.strategy}} property in the worker 
> configuration, and by any user-specified 
> {{consumer.override.partition.assignment.strategy}} property in a sink 
> connector configuration when per-connector client overrides is enabled in the 
> worker config with {{connector.client.config.override.policy=ALL}}.
> This improvement is viable as far back as -2.3- 2.4, when the 
> {{CooperativeStickyAssignor}} was introduced, but given that it is not a bug 
> fix, should only be applied to the Connect framework in an upcoming minor 
> release.
> h3. Manually setting the partition assignment strategy
> There is a simple workaround to achieve the same behavior in AK releases 2.4 
> and later that don't also include this improvement: either set a value for 
> the {{consumer.partition.assignment.strategy}} property in the *worker 
> configuration, or* set a value for the 
> {{consumer.override.partition.assignment.strategy}} property in one or more 
> *connector configurations* when per-connector client overrides is enabled in 
> the worker config with {{connector.client.config.override.policy=ALL}}.
> In order to avoid task failures while the connector is being reconfigured, it 
> is highly recommended that the consumer be configured with a list of both the 
> new and the current partition assignment strategies, instead of just the new 
> partition assignment strategy. For example, to update a connector configured 
> to use the {{RangeAssignor}} strategy to use the {{RoundRobinAssignor}} 
> strategy instead, add the following to the connector configuration:
> {code:java}
> "consumer.override.partition.assignment.strategy": 
> "org.apache.kafka.clients.consumer.RoundRobinAssignor, 
> org.apache.kafka.clients.consumer.RangeAssignor"{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to