[jira] [Commented] (KAFKA-12463) Update default consumer partition assignor for sink tasks
[ https://issues.apache.org/jira/browse/KAFKA-12463?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17314151#comment-17314151 ] Chris Egerton commented on KAFKA-12463: --- Looks like this may be accomplished by [KIP-726|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=177048248], which currently proposes that the default consumer partition assignment strategy be upgraded to {{["cooperative-sticky", "range"]}} for all consumer applications (including Connect). > Update default consumer partition assignor for sink tasks > - > > Key: KAFKA-12463 > URL: https://issues.apache.org/jira/browse/KAFKA-12463 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Reporter: Chris Egerton >Assignee: Chris Egerton >Priority: Major > Labels: needs-kip > > Kafka consumers have a pluggable [partition assignment > interface|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.html] > that comes with several out-of-the-box implementations including the > [RangeAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/RangeAssignor.html], > > [RoundRobinAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/RoundRobinAssignor.html], > > [StickyAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/StickyAssignor.html], > and > [CooperativeStickyAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/CooperativeStickyAssignor.html]. > If no partition assignor is configured with a consumer, the {{RangeAssignor}} > is used by default. Although there are some benefits to this assignor > including stability of assignment across generations and simplicity of > design, it comes with a major drawback: the number of active consumers in a > group is limited to the number of partitions in the topic(s) with the most > partitions. For an example of the worst case, in a consumer group where every > member is subscribed to ten topics that each have one partition, only one > member of that group will be assigned any topic partitions. > This can end up producing counterintuitive and even frustrating behavior when > a sink connector is brought up with N tasks to read from some collection of > topics with a total of N topic partitions, but some tasks end up idling and > not processing any data. > h3. Proposed Change > *NOTE: Until/unless KAFKA-12477 is addressed, the approach outlined below > will not work as consumers will still perform eager rebalancing as long as at > least one of the partition assignors they are configured with does not > support cooperative rebalancing. KAFKA-12487 should also be addressed before > configuring any Connect worker to use the {{CooperativeStickyAssignor}} for > any sink connectors.* > [KIP-429|https://cwiki.apache.org/confluence/display/KAFKA/KIP-429%3A+Kafka+Consumer+Incremental+Rebalance+Protocol] > introduced the {{CooperativeStickyAssignor}}, which seeks to provide a > stable assignment across generations wherever possible, provide the most even > assignment possible (taking into account possible differences in > subscriptions across consumers in the group), and allow consumers to continue > processing data during rebalance. The documentation for the assignor states > that "Users should prefer this assignor for newer clusters." As Connect and > the tooling around it matures and automatic restarts of failed tasks become > more popular, care should be taken to ensure that the consumer group churn > created by restarting one or more tasks doesn't compromise the availability > of other tasks by forcing them to temporarily yield up all of their > partitions just to reclaim them after a rebalance has completed. > With that in mind, we should alter the default consumer configuration for > sink tasks to use the new {{CooperativeStickyAssignor}}. In order to do this > in a backwards-compatible fashion that also enables rolling upgrades, this > should be implemented by changing the {{Worker}} to set the following on the > consumer configuration created for each sink connector task: > {code:java} > partition.assignment.strategy=org.apache.kafka.clients.consumer.CooperativeStickyAssignor,org.apache.kafka.clients.consumer.RangeAssignor > {code} > This way, consumer groups for sink connectors on Connect clusters in the > process of being upgraded will continue to use the {{RangeAssignor}} until > all workers in the cluster have been upgraded, and then will switch over to > the new {{CooperativeStickyAssignor}} automatically. > Importantly, this setting will only be a default, and any user-specified > overrides either in the *worker config*: > > {code:java} >
[jira] [Commented] (KAFKA-12463) Update default consumer partition assignor for sink tasks
[ https://issues.apache.org/jira/browse/KAFKA-12463?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17308235#comment-17308235 ] Chris Egerton commented on KAFKA-12463: --- Thanks [~kkonstantine]--I'll admit I was hoping to avoid a KIP as this seems like more of an implementation detail and not something most users would consider when using Connect, but the longer the conversation has gone on it's become clear that this is a measure-twice, cut-once situation with multiple perspectives and tradeoffs that need to be taken into account. Regardless of whether a KIP is strictly required or not, I certainly agree it'd be best at this point to proceed with one, so I'll add the {{needs-kip}} tag and will hopefully get a design doc drafted and on the mailing list within the next week or two. > Update default consumer partition assignor for sink tasks > - > > Key: KAFKA-12463 > URL: https://issues.apache.org/jira/browse/KAFKA-12463 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Reporter: Chris Egerton >Assignee: Chris Egerton >Priority: Major > Labels: needs-kip > > Kafka consumers have a pluggable [partition assignment > interface|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.html] > that comes with several out-of-the-box implementations including the > [RangeAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/RangeAssignor.html], > > [RoundRobinAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/RoundRobinAssignor.html], > > [StickyAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/StickyAssignor.html], > and > [CooperativeStickyAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/CooperativeStickyAssignor.html]. > If no partition assignor is configured with a consumer, the {{RangeAssignor}} > is used by default. Although there are some benefits to this assignor > including stability of assignment across generations and simplicity of > design, it comes with a major drawback: the number of active consumers in a > group is limited to the number of partitions in the topic(s) with the most > partitions. For an example of the worst case, in a consumer group where every > member is subscribed to ten topics that each have one partition, only one > member of that group will be assigned any topic partitions. > This can end up producing counterintuitive and even frustrating behavior when > a sink connector is brought up with N tasks to read from some collection of > topics with a total of N topic partitions, but some tasks end up idling and > not processing any data. > h3. Proposed Change > *NOTE: Until/unless KAFKA-12477 is addressed, the approach outlined below > will not work as consumers will still perform eager rebalancing as long as at > least one of the partition assignors they are configured with does not > support cooperative rebalancing. KAFKA-12487 should also be addressed before > configuring any Connect worker to use the {{CooperativeStickyAssignor}} for > any sink connectors.* > [KIP-429|https://cwiki.apache.org/confluence/display/KAFKA/KIP-429%3A+Kafka+Consumer+Incremental+Rebalance+Protocol] > introduced the {{CooperativeStickyAssignor}}, which seeks to provide a > stable assignment across generations wherever possible, provide the most even > assignment possible (taking into account possible differences in > subscriptions across consumers in the group), and allow consumers to continue > processing data during rebalance. The documentation for the assignor states > that "Users should prefer this assignor for newer clusters." As Connect and > the tooling around it matures and automatic restarts of failed tasks become > more popular, care should be taken to ensure that the consumer group churn > created by restarting one or more tasks doesn't compromise the availability > of other tasks by forcing them to temporarily yield up all of their > partitions just to reclaim them after a rebalance has completed. > With that in mind, we should alter the default consumer configuration for > sink tasks to use the new {{CooperativeStickyAssignor}}. In order to do this > in a backwards-compatible fashion that also enables rolling upgrades, this > should be implemented by changing the {{Worker}} to set the following on the > consumer configuration created for each sink connector task: > {code:java} > partition.assignment.strategy=org.apache.kafka.clients.consumer.CooperativeStickyAssignor,org.apache.kafka.clients.consumer.RangeAssignor > {code} > This way, consumer groups for sink connectors on Connect clusters in the > process of being upgraded will
[jira] [Commented] (KAFKA-12463) Update default consumer partition assignor for sink tasks
[ https://issues.apache.org/jira/browse/KAFKA-12463?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17306798#comment-17306798 ] Konstantine Karantasis commented on KAFKA-12463: Long and interesting history of comments already and I admit I'll have to return soon to read more in detail. Just a meta comment for now: I think that changing the default assignor will be a noticeable change in behavior in several circumstances. This makes this a user facing change for users that run sink connectors, thus I think it'd be good to highlight this change with a brief KIP. What do you think [~ChrisEgerton] [~rhauch]? > Update default consumer partition assignor for sink tasks > - > > Key: KAFKA-12463 > URL: https://issues.apache.org/jira/browse/KAFKA-12463 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Reporter: Chris Egerton >Assignee: Chris Egerton >Priority: Major > > Kafka consumers have a pluggable [partition assignment > interface|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.html] > that comes with several out-of-the-box implementations including the > [RangeAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/RangeAssignor.html], > > [RoundRobinAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/RoundRobinAssignor.html], > > [StickyAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/StickyAssignor.html], > and > [CooperativeStickyAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/CooperativeStickyAssignor.html]. > If no partition assignor is configured with a consumer, the {{RangeAssignor}} > is used by default. Although there are some benefits to this assignor > including stability of assignment across generations and simplicity of > design, it comes with a major drawback: the number of active consumers in a > group is limited to the number of partitions in the topic(s) with the most > partitions. For an example of the worst case, in a consumer group where every > member is subscribed to ten topics that each have one partition, only one > member of that group will be assigned any topic partitions. > This can end up producing counterintuitive and even frustrating behavior when > a sink connector is brought up with N tasks to read from some collection of > topics with a total of N topic partitions, but some tasks end up idling and > not processing any data. > h3. Proposed Change > *NOTE: Until/unless KAFKA-12477 is addressed, the approach outlined below > will not work as consumers will still perform eager rebalancing as long as at > least one of the partition assignors they are configured with does not > support cooperative rebalancing. KAFKA-12487 should also be addressed before > configuring any Connect worker to use the {{CooperativeStickyAssignor}} for > any sink connectors.* > [KIP-429|https://cwiki.apache.org/confluence/display/KAFKA/KIP-429%3A+Kafka+Consumer+Incremental+Rebalance+Protocol] > introduced the {{CooperativeStickyAssignor}}, which seeks to provide a > stable assignment across generations wherever possible, provide the most even > assignment possible (taking into account possible differences in > subscriptions across consumers in the group), and allow consumers to continue > processing data during rebalance. The documentation for the assignor states > that "Users should prefer this assignor for newer clusters." As Connect and > the tooling around it matures and automatic restarts of failed tasks become > more popular, care should be taken to ensure that the consumer group churn > created by restarting one or more tasks doesn't compromise the availability > of other tasks by forcing them to temporarily yield up all of their > partitions just to reclaim them after a rebalance has completed. > With that in mind, we should alter the default consumer configuration for > sink tasks to use the new {{CooperativeStickyAssignor}}. In order to do this > in a backwards-compatible fashion that also enables rolling upgrades, this > should be implemented by changing the {{Worker}} to set the following on the > consumer configuration created for each sink connector task: > {code:java} > partition.assignment.strategy=org.apache.kafka.clients.consumer.CooperativeStickyAssignor,org.apache.kafka.clients.consumer.RangeAssignor > {code} > This way, consumer groups for sink connectors on Connect clusters in the > process of being upgraded will continue to use the {{RangeAssignor}} until > all workers in the cluster have been upgraded, and then will switch over to > the new {{CooperativeStickyAssignor}} automatically. > Importantly, this se
[jira] [Commented] (KAFKA-12463) Update default consumer partition assignor for sink tasks
[ https://issues.apache.org/jira/browse/KAFKA-12463?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17304519#comment-17304519 ] Chris Egerton commented on KAFKA-12463: --- Thanks Randall-- {quote}I agree with you that we should fix the behavior. But the fix will appear only in certain releases, and not all users will be able to upgrade to those releases to get the fix. So, documenting the simple workaround will help users in those situations. I suggested documenting the workaround here to help any users that do stumble upon this issue when searching for a potential fix, regardless of whether the workaround is also documented elsewhere. {quote} That's an interesting take... I guess the difference I perceive here is that this is a proposed improvement, not really a fix. I don't know if any users are going to stumble onto this after something breaks since it doesn't address anything breaking to begin with. Either way, I guess we can try to iron out the workaround a little more here (will do now) but hopefully we can put something in the public-facing docs for Connect (maybe as part of upgrade notes if we change the default consumer partition assignment strategy) in addition to that; seems like that might get more of the target audience here. With regards to steps forward and your question, [~ableegoldman], I wasn't certain one way or the other about round robin vs cooperative sticky assignment. I had a few thoughts: * When the set of task configurations changes, the advantages of stickiness or the cooperative protocol are basically irrelevant since each task and its accompanying consumer is brought down and a new one is brought up in its place. * In a completely stable world where consumer assignment never changes, round robin would be ideal (out of the ones available right now) as it'd guarantee as-even-as-possible spread of partitions within the same topic across tasks. * Otherwise, the only times the cooperative protocol might come into play are: ** When a consumer subscription is updated (which would cause a consumer rebalance but keep all sink tasks running) ** When a task is started or shut down without the connector being reconfigured (which may happen when a single task fails, a failed task is restarted, a new worker joins the group, or an existing worker leaves the group) ** When the consumer for a task falls out of the group (likely because the task is taking too long to process data provided to it by the framework). * Under most of these scenarios, stickiness would provide no benefit as any time a consumer is created, a new task is brought up in its place. The only exception is an update to a consumer subscription, but even that would require some changes to how Connect invokes [SinkTask::open|https://kafka.apache.org/27/javadoc/org/apache/kafka/connect/sink/SinkTask.html#open-java.util.Collection-] and [SinkTask::close|https://kafka.apache.org/27/javadoc/org/apache/kafka/connect/sink/SinkTask.html#close-java.util.Collection-] to basically fake cooperative rebalancing in the way that's proposed in the [StickyAssignor docs|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/StickyAssignor.html] under the "Impact on {{ConsumerRebalanceListener}}" section. The benefits of the sticky assignor seem pretty slim, and would require some one-off tooling from Connect to basically re-implement the cooperative protocol. This is why I'm personally not in favor of it, but would love to learn more if there's something I'm missing here. So with all that in mind, we can transform the question into whether it's important to favor any of the scenarios outlined above where cooperative rebalancing might be of some benefit, and if not, opt to use the round robin assignor. There's one that comes to mind that I think might be worth considering, and count in favor of using the cooperative sticky assignor: if there's any kind of tooling in place that restarts failed tasks automatically, there will be significant consumer churn as consumers may rapidly fall out of and join the group. I think this scenario is going to become more and more common as adoption of Connect increases and both it and the tooling around it mature, and as a result, I'm gently in favor of trying to use the {{CooperativeStickyAssignor}} now, or at least, when it becomes possible in the Connect framework once work on KAFKA-12477 and KAFKA-12487 completes. I raised this point on the PR but it bears repeating here: we might want to reason through this carefully since we only get one shot to do this kind of automated upgrade before the next one gets more complicated (the list of assignors will grow from one to two this time around; it'll either have to grow from two to three the next, or we'll have to risk breaking changes for users who skip an upgrade step). I raised the round robin assign
[jira] [Commented] (KAFKA-12463) Update default consumer partition assignor for sink tasks
[ https://issues.apache.org/jira/browse/KAFKA-12463?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17304452#comment-17304452 ] A. Sophie Blee-Goldman commented on KAFKA-12463: Out of curiosity, why use the RoundRobinAssignor rather than the StickyAssignor if you want to avoid cooperative? I'm not a Connect expert so maybe there's some nuance here, but the StickyAssignor is generally preferred (we were planning to make that the default in 3.0 before the CooperativeStickyAssignor came along) > Update default consumer partition assignor for sink tasks > - > > Key: KAFKA-12463 > URL: https://issues.apache.org/jira/browse/KAFKA-12463 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Reporter: Chris Egerton >Assignee: Chris Egerton >Priority: Major > > Kafka consumers have a pluggable [partition assignment > interface|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.html] > that comes with several out-of-the-box implementations including the > [RangeAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/RangeAssignor.html], > > [RoundRobinAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/RoundRobinAssignor.html], > > [StickyAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/StickyAssignor.html], > and > [CooperativeStickyAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/CooperativeStickyAssignor.html]. > If no partition assignor is configured with a consumer, the {{RangeAssignor}} > is used by default. Although there are some benefits to this assignor > including stability of assignment across generations and simplicity of > design, it comes with a major drawback: the number of active consumers in a > group is limited to the number of partitions in the topic(s) with the most > partitions. For an example of the worst case, in a consumer group where every > member is subscribed to ten topics that each have one partition, only one > member of that group will be assigned any topic partitions. > This can end up producing counterintuitive and even frustrating behavior when > a sink connector is brought up with N tasks to read from some collection of > topics with a total of N topic partitions, but some tasks end up idling and > not processing any data. > h3. Proposed Change > *NOTE: Until/unless KAFKA-12477 is addressed, the approach outlined below > will not work as consumers will still perform eager rebalancing as long as at > least one of the partition assignors they are configured with does not > support cooperative rebalancing. KAFKA-12487 should also be addressed before > configuring any Connect worker to use the {{CooperativeStickyAssignor}} for > any sink connectors.* > [KIP-429|https://cwiki.apache.org/confluence/display/KAFKA/KIP-429%3A+Kafka+Consumer+Incremental+Rebalance+Protocol] > introduced the {{CooperativeStickyAssignor}}, which seeks to provide a > stable assignment across generations wherever possible, provide the most even > assignment possible (taking into account possible differences in > subscriptions across consumers in the group), and allow consumers to continue > processing data during rebalance. The documentation for the assignor states > that "Users should prefer this assignor for newer clusters." > We should alter the default consumer configuration for sink tasks to use the > new {{CooperativeStickyAssignor}}. In order to do this in a > backwards-compatible fashion that also enables rolling upgrades, this should > be implemented by changing the {{Worker}} to set the following on the > consumer configuration created for each sink connector task: > {code:java} > partition.assignment.strategy=org.apache.kafka.clients.consumer.CooperativeStickyAssignor,org.apache.kafka.clients.consumer.RangeAssignor > {code} > This way, consumer groups for sink connectors on Connect clusters in the > process of being upgraded will continue to use the {{RangeAssignor}} until > all workers in the cluster have been upgraded, and then will switch over to > the new {{CooperativeStickyAssignor}} automatically. But, this setting will > be overwritten by any user-specified > {{consumer.partition.assignment.strategy}} property in the worker > configuration, and by any user-specified > {{consumer.override.partition.assignment.strategy}} property in a sink > connector configuration when per-connector client overrides is enabled in the > worker config with {{connector.client.config.override.policy=ALL}}. > This improvement is viable as far back as -2.3- 2.4, when the > {{CooperativeStickyAssignor}} was introduced, but given that it is not a bug > fi
[jira] [Commented] (KAFKA-12463) Update default consumer partition assignor for sink tasks
[ https://issues.apache.org/jira/browse/KAFKA-12463?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17304248#comment-17304248 ] Randall Hauch commented on KAFKA-12463: --- Thanks for the response and logging KAFKA-12487, [~ChrisEgerton]. {quote} I'd also just like to point out that the goal here is to improve the out-of-the-box behavior of Connect for users; although workarounds are nice to have, the goal here shouldn't be to focus on documenting them but instead, to make them obsolete. If we decide not to improve the default behavior of Connect then we can document this somewhere else that's a little more visible for users as opposed to developers. {quote} I agree with you that we should fix the behavior. But the fix will appear only in certain releases, and not all users will be able to upgrade to those releases to get the fix. So, documenting the simple workaround will help users in those situations. I suggested documenting the workaround here to help any users that do stumble upon this issue when searching for a potential fix, regardless of whether the workaround is also documented elsewhere. IIUC, given KAFKA-12487 is likely more challenging to fix and has not yet been addressed, the shorter term proposal here is to change the worker to set the following on consumer configs used in sink connectors: {code:java} partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor,org.apache.kafka.clients.consumer.RangeAssignor{code} This works because the round robin will be used only after all workers have been upgraded, and this gives us more balanced consumer assignments. Plus, it is backward compatible since the worker will always override this new value should users have any worker configs that override this property via: {code:java} consumer.partition.assignment.strategy=... {code} or have any connector configs that use client overrides via: {code:java} consumer.overrides.partition.assignment.strategy=...{code} If that is the proposal, WDYT about updating the description to make this more clear? Essentially, I suggest this issue's description would state the problem (that section is good), propose a solution using round robin (mostly using your proposed section to use round robin rather than cooperative), document the workaround, and finally address why {{RoundRobitAssignor}} was used instead of {{CooperativeStickyAssignor}}. And if we're on the same page, then I think it's worth updating the PR to implement the proposed fix. I'll state the same on a review for the PR. > Update default consumer partition assignor for sink tasks > - > > Key: KAFKA-12463 > URL: https://issues.apache.org/jira/browse/KAFKA-12463 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Reporter: Chris Egerton >Assignee: Chris Egerton >Priority: Major > > Kafka consumers have a pluggable [partition assignment > interface|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.html] > that comes with several out-of-the-box implementations including the > [RangeAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/RangeAssignor.html], > > [RoundRobinAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/RoundRobinAssignor.html], > > [StickyAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/StickyAssignor.html], > and > [CooperativeStickyAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/CooperativeStickyAssignor.html]. > If no partition assignor is configured with a consumer, the {{RangeAssignor}} > is used by default. Although there are some benefits to this assignor > including stability of assignment across generations and simplicity of > design, it comes with a major drawback: the number of active consumers in a > group is limited to the number of partitions in the topic(s) with the most > partitions. For an example of the worst case, in a consumer group where every > member is subscribed to ten topics that each have one partition, only one > member of that group will be assigned any topic partitions. > This can end up producing counterintuitive and even frustrating behavior when > a sink connector is brought up with N tasks to read from some collection of > topics with a total of N topic partitions, but some tasks end up idling and > not processing any data. > h3. Proposed Change > *NOTE: Until/unless KAFKA-12477 is addressed, the approach outlined below > will not work as consumers will still perform eager rebalancing as long as at > least one of the partition assignors they are configured with does not > support cooperative rebalancing. KAFKA-12487 s
[jira] [Commented] (KAFKA-12463) Update default consumer partition assignor for sink tasks
[ https://issues.apache.org/jira/browse/KAFKA-12463?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17302981#comment-17302981 ] Chris Egerton commented on KAFKA-12463: --- Fun fact--Connect does not like cooperative consumers. See https://issues.apache.org/jira/browse/KAFKA-12487 for details. Until/unless that's addressed, we can't change the default partitioner for Connect to the {{CooperativeStickyAssignor}}. [~rhauch] thanks for the rewrite, I hope that makes things easier to read. Responses inline: {quote}Specifically, isn't it true that after the Connect worker config is changed, one rolling restart of the Connect workers will not immediately use the cooperative assignor (since the last worker to restart will only use the range assignor (the default), and that the cooperative assignor will only be used after a second rolling restart. {quote} That's true if the second rolling restart includes a change to the partition assignment strategy for its consumers to only use the {{CooperativeStickyAssignor}} and remove the {{RangeAssignor}} from the list, yes. Given that and the recently-discovered KAFKA-12487 , I don't even think we're at the point of updating the description yet to mention an upgrade process that would accommodate the {{CooperativeStickyAssignor}} without having a warning notice preceding any hypothetical designs we might implement once both of these pain points are addressed. I've updated the description accordingly, feel free to make any edits as long as we don't actually instruct users how to configure their workers with the {{CooperativeStickyAssignor}} as that will lead to bad worker behavior. {quote}In both standalone and distributed, Connect only updates the task configs if and only if the task configs changed. If a user *only* changes the {{consumer.override.partition.assignment.strategy}} property in a connector config, then the task configs will not be changed and the tasks will not be restarted. {quote} Ah yes, KAFKA-9228. When I initially discovered that one I was a little alarmed but it turns out that the overwhelming majority of connectors out there are unaffected by just by quirk of how people tend to implement {{Connector::taskConfigs}}. The only cases I've been able to find where this bug comes up are in the file stream connectors. If we believe this is likely to affect other connectors, I personally think we should be addressing that bug instead of working around it or documenting it as a potential gotchas. {quote}If Connect does attempt to restart the connector's tasks, the herder does not wait for the tasks to stop before starting any of them. This means that it may take several such connector & task restarts before the cooperative partition assignment strategy will take effect. {quote} That's a fair point, and it applies to any change of partition assignment strategy and not just specifically moving from an eager to a cooperative one. This becomes especially likely if a task isn't able to respond to a shutdown request within the graceful shutdown period (which defaults to five seconds). The workaround here is to enable both partition assignment strategies for the consumer with a preference for the desired strategy; that way, the desired strategy will take effect as soon as every consumer in the group has been updated, and nobody will break beforehand. I'll update the workaround section in the description to include that info. I'd like to point out that the goal here is to improve the out-of-the-box behavior of Connect for users; although workarounds are nice to have, the goal here shouldn't be to focus on documenting them but instead, to make them obsolete. If we decide not to improve the default behavior of Connect then we can document this somewhere else that's a little more visible for users as opposed to developers. > Update default consumer partition assignor for sink tasks > - > > Key: KAFKA-12463 > URL: https://issues.apache.org/jira/browse/KAFKA-12463 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Reporter: Chris Egerton >Assignee: Chris Egerton >Priority: Major > > Kafka consumers have a pluggable [partition assignment > interface|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.html] > that comes with several out-of-the-box implementations including the > [RangeAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/RangeAssignor.html], > > [RoundRobinAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/RoundRobinAssignor.html], > > [StickyAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/StickyAssignor.html], > and >
[jira] [Commented] (KAFKA-12463) Update default consumer partition assignor for sink tasks
[ https://issues.apache.org/jira/browse/KAFKA-12463?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17302680#comment-17302680 ] Randall Hauch commented on KAFKA-12463: --- [~ChrisEgerton], I editing the description of this issue a bit to clarify the proposal versus the workaround, and formatted the config settings to make them more prominent. Given [~ableegoldman]'s comments about a double rolling upgrade being required to ensure the consumer group uses the cooperative assignor, WDYT about updating this issue's description to talk about rolling upgrades. Specifically, isn't it true that after the Connect worker config is changed, one rolling restart of the Connect workers will not immediately use the cooperative assignor (since the last worker to restart will only use the range assignor (the default), and that the cooperative assignor will only be used after a second rolling restart. Interestingly, IIUC when a connector configuration is changed to specify the partition assignors in AK 2.4 or later, there are a few challenges to make this work: # In both standalone and distributed, Connect only updates the task configs if and only if the task configs changed. If a user *only* changes the {{consumer.override.partition.assignment.strategy}} property in a connector config, then the task configs will not be changed and the tasks will not be restarted. # If Connect does attempt to restart the connector's tasks, the herder does not wait for the tasks to stop before starting any of them. This means that it may take several such connector & task restarts before the cooperative partition assignment strategy will take effect. Seems like we should at least mention this in the workaround above. WDYT? > Update default consumer partition assignor for sink tasks > - > > Key: KAFKA-12463 > URL: https://issues.apache.org/jira/browse/KAFKA-12463 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Reporter: Chris Egerton >Assignee: Chris Egerton >Priority: Major > > Kafka consumers have a pluggable [partition assignment > interface|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.html] > that comes with several out-of-the-box implementations including the > [RangeAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/RangeAssignor.html], > > [RoundRobinAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/RoundRobinAssignor.html], > > [StickyAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/StickyAssignor.html], > and > [CooperativeStickyAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/CooperativeStickyAssignor.html]. > If no partition assignor is configured with a consumer, the {{RangeAssignor}} > is used by default. Although there are some benefits to this assignor > including stability of assignment across generations and simplicity of > design, it comes with a major drawback: the number of active consumers in a > group is limited to the number of partitions in the topic(s) with the most > partitions. For an example of the worst case, in a consumer group where every > member is subscribed to ten topics that each have one partition, only one > member of that group will be assigned any topic partitions. > This can end up producing counterintuitive and even frustrating behavior when > a sink connector is brought up with N tasks to read from some collection of > topics with a total of N topic partitions, but some tasks end up idling and > not processing any data. > h3. Proposed Change > [KIP-429|https://cwiki.apache.org/confluence/display/KAFKA/KIP-429%3A+Kafka+Consumer+Incremental+Rebalance+Protocol] > introduced the {{CooperativeStickyAssignor}}, which seeks to provide a > stable assignment across generations wherever possible, provide the most even > assignment possible (taking into account possible differences in > subscriptions across consumers in the group), and allow consumers to continue > processing data during rebalance. The documentation for the assignor states > that "Users should prefer this assignor for newer clusters." > We should alter the default consumer configuration for sink tasks to use the > new {{CooperativeStickyAssignor}}. In order to do this in a > backwards-compatible fashion that also enables rolling upgrades, this should > be implemented by changing the {{Worker}} to set the following on the > consumer configuration created for each sink connector task: > {code:java} > partition.assignment.strategy=org.apache.kafka.clients.consumer.CooperativeStickyAssignor,org.apache.kafka.clients.consumer.RangeAssignor > {code} > This way, consumer groups
[jira] [Commented] (KAFKA-12463) Update default consumer partition assignor for sink tasks
[ https://issues.apache.org/jira/browse/KAFKA-12463?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17302225#comment-17302225 ] A. Sophie Blee-Goldman commented on KAFKA-12463: Filed https://issues.apache.org/jira/browse/KAFKA-12477 – would be nice to target 3.0 here, both for this ticket and for https://issues.apache.org/jira/browse/KAFKA-12473 > Update default consumer partition assignor for sink tasks > - > > Key: KAFKA-12463 > URL: https://issues.apache.org/jira/browse/KAFKA-12463 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Reporter: Chris Egerton >Assignee: Chris Egerton >Priority: Major > > Kafka consumers have a pluggable [partition assignment > interface|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.html] > that comes with several out-of-the-box implementations including the > [RangeAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/RangeAssignor.html], > > [RoundRobinAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/RoundRobinAssignor.html], > > [StickyAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/StickyAssignor.html], > and > [CooperativeStickyAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/CooperativeStickyAssignor.html]. > If no partition assignor is configured with a consumer, the {{RangeAssignor}} > is used by default. Although there are some benefits to this assignor > including stability of assignment across generations and simplicity of > design, it comes with a major drawback: the number of active consumers in a > group is limited to the number of partitions in the topic(s) with the most > partitions. For an example of the worst case, in a consumer group where every > member is subscribed to ten topics that each have one partition, only one > member of that group will be assigned any topic partitions. > This can end up producing counterintuitive and even frustrating behavior when > a sink connector is brought up with N tasks to read from some collection of > topics with a total of N topic partitions, but some tasks end up idling and > not processing any data. > > [KIP-429|https://cwiki.apache.org/confluence/display/KAFKA/KIP-429%3A+Kafka+Consumer+Incremental+Rebalance+Protocol] > introduced the {{CooperativeStickyAssignor}}, which seeks to provide a > stable assignment across generations wherever possible, provide the most even > assignment possible (taking into account possible differences in > subscriptions across consumers in the group), and allow consumers to continue > processing data during rebalance. The documentation for the assignor states > that "Users should prefer this assignor for newer clusters." > We should alter the default consumer configuration for sink tasks to use the > new {{CooperativeStickyAssignor}}. In order to do this in a > backwards-compatible fashion that also enables rolling upgrades, this should > be implemented by setting the {{partition.assignment.strategy}} property of > sink task consumers to the list > {{org.apache.kafka.clients.consumer.CooperativeStickyAssignor, > org.apache.kafka.clients.consumer.RangeAssignor}} when no worker-level or > connector-level override is present. > This way, consumer groups for sink connectors on Connect clusters in the > process of being upgraded will continue to use the {{RangeAssignor}} until > all workers in the cluster have been upgraded, and then will switch over to > the new {{CooperativeStickyAssignor}} automatically. > > This improvement is viable as far back as -2.3- 2.4, when the > {{CooperativeStickyAssignor}} was introduced, but given that it is not a bug > fix, should only be applied to the Connect framework in an upcoming minor > release. This does not preclude users from following the steps outlined here > to improve sink connector behavior on existing clusters by modifying their > worker configs to use {{consumer.partition.assignment.strategy =}} > {{org.apache.kafka.clients.consumer.CooperativeStickyAssignor, > org.apache.kafka.clients.consumer.RangeAssignor}}, or doing the same on a > per-connector basis using the > {{consumer.override.partition.assignment.strategy}} property. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-12463) Update default consumer partition assignor for sink tasks
[ https://issues.apache.org/jira/browse/KAFKA-12463?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17302205#comment-17302205 ] Chris Egerton commented on KAFKA-12463: --- Haha, I should have been clearer–we do use our own type of coordinator for Connect, but that's for a separate thing. The [WorkerCoordinator|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java] is used by workers to assign connectors and tasks amongst themselves. For sink connectors, we create a consumer group and each task is given a single consumer in that group (the [WorkerSinkTask|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java] class contains a lot of this logic). This is the type of consumer group I think we should improve the assignment logic for in Connect. You've laid out the issues with using any kind of cooperative assignor with the consumer pretty well. If we can improve the consumer rebalance logic to enable a one-bounce upgrade to the cooperative protocol that'd be fantastic; in the meantime, it may be worth it to switch to another assignor that, while not cooperative, is still friendlier to sink connectors that consume from a large number of low-partition topics. > Update default consumer partition assignor for sink tasks > - > > Key: KAFKA-12463 > URL: https://issues.apache.org/jira/browse/KAFKA-12463 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Reporter: Chris Egerton >Assignee: Chris Egerton >Priority: Major > > Kafka consumers have a pluggable [partition assignment > interface|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.html] > that comes with several out-of-the-box implementations including the > [RangeAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/RangeAssignor.html], > > [RoundRobinAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/RoundRobinAssignor.html], > > [StickyAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/StickyAssignor.html], > and > [CooperativeStickyAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/CooperativeStickyAssignor.html]. > If no partition assignor is configured with a consumer, the {{RangeAssignor}} > is used by default. Although there are some benefits to this assignor > including stability of assignment across generations and simplicity of > design, it comes with a major drawback: the number of active consumers in a > group is limited to the number of partitions in the topic(s) with the most > partitions. For an example of the worst case, in a consumer group where every > member is subscribed to ten topics that each have one partition, only one > member of that group will be assigned any topic partitions. > This can end up producing counterintuitive and even frustrating behavior when > a sink connector is brought up with N tasks to read from some collection of > topics with a total of N topic partitions, but some tasks end up idling and > not processing any data. > > [KIP-429|https://cwiki.apache.org/confluence/display/KAFKA/KIP-429%3A+Kafka+Consumer+Incremental+Rebalance+Protocol] > introduced the {{CooperativeStickyAssignor}}, which seeks to provide a > stable assignment across generations wherever possible, provide the most even > assignment possible (taking into account possible differences in > subscriptions across consumers in the group), and allow consumers to continue > processing data during rebalance. The documentation for the assignor states > that "Users should prefer this assignor for newer clusters." > We should alter the default consumer configuration for sink tasks to use the > new {{CooperativeStickyAssignor}}. In order to do this in a > backwards-compatible fashion that also enables rolling upgrades, this should > be implemented by setting the {{partition.assignment.strategy}} property of > sink task consumers to the list > {{org.apache.kafka.clients.consumer.CooperativeStickyAssignor, > org.apache.kafka.clients.consumer.RangeAssignor}} when no worker-level or > connector-level override is present. > This way, consumer groups for sink connectors on Connect clusters in the > process of being upgraded will continue to use the {{RangeAssignor}} until > all workers in the cluster have been upgraded, and then will switch over to > the new {{CooperativeStickyAssignor}} automatically. > > This improvement is viable as far back as -2.3- 2.4, when the > {{CooperativeStickyAssignor}} was introduced, but given that it is not a bug > fix,
[jira] [Commented] (KAFKA-12463) Update default consumer partition assignor for sink tasks
[ https://issues.apache.org/jira/browse/KAFKA-12463?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17302202#comment-17302202 ] A. Sophie Blee-Goldman commented on KAFKA-12463: Maybe we could improve things on the Consumer side to avoid the need for a second rolling bounce, though. Haven't thought through all the details here but off the top of my head, it seems like we can actually choose the rebalancing protocol based off of the chosen assignor after the first rebalance. The very first rebalance after a consumer is bounced would require selecting the EAGER protocol, but that hardly matters since after restarting the consumer won't have any partitions to revoke to begin with. After a successful rebalance, the ConsumerCoordinator is informed that the group coordinator selected the CooperativeStickyAssignor, which of course indicates that all members have been upgraded to the new byte code and support cooperative rebalancing. At that point we can safely drop the EAGER protocol and choose the highest-supported protocol of the CooperativeStickyAssignor, which is of course the COOPERATIVE protocol. tl;dr we don't need to stick with the same rebalancing protocol that gets selected in the ConsumerCoordinator's constructor, why not adapt to new information as it comes in > Update default consumer partition assignor for sink tasks > - > > Key: KAFKA-12463 > URL: https://issues.apache.org/jira/browse/KAFKA-12463 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Reporter: Chris Egerton >Assignee: Chris Egerton >Priority: Major > > Kafka consumers have a pluggable [partition assignment > interface|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.html] > that comes with several out-of-the-box implementations including the > [RangeAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/RangeAssignor.html], > > [RoundRobinAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/RoundRobinAssignor.html], > > [StickyAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/StickyAssignor.html], > and > [CooperativeStickyAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/CooperativeStickyAssignor.html]. > If no partition assignor is configured with a consumer, the {{RangeAssignor}} > is used by default. Although there are some benefits to this assignor > including stability of assignment across generations and simplicity of > design, it comes with a major drawback: the number of active consumers in a > group is limited to the number of partitions in the topic(s) with the most > partitions. For an example of the worst case, in a consumer group where every > member is subscribed to ten topics that each have one partition, only one > member of that group will be assigned any topic partitions. > This can end up producing counterintuitive and even frustrating behavior when > a sink connector is brought up with N tasks to read from some collection of > topics with a total of N topic partitions, but some tasks end up idling and > not processing any data. > > [KIP-429|https://cwiki.apache.org/confluence/display/KAFKA/KIP-429%3A+Kafka+Consumer+Incremental+Rebalance+Protocol] > introduced the {{CooperativeStickyAssignor}}, which seeks to provide a > stable assignment across generations wherever possible, provide the most even > assignment possible (taking into account possible differences in > subscriptions across consumers in the group), and allow consumers to continue > processing data during rebalance. The documentation for the assignor states > that "Users should prefer this assignor for newer clusters." > We should alter the default consumer configuration for sink tasks to use the > new {{CooperativeStickyAssignor}}. In order to do this in a > backwards-compatible fashion that also enables rolling upgrades, this should > be implemented by setting the {{partition.assignment.strategy}} property of > sink task consumers to the list > {{org.apache.kafka.clients.consumer.CooperativeStickyAssignor, > org.apache.kafka.clients.consumer.RangeAssignor}} when no worker-level or > connector-level override is present. > This way, consumer groups for sink connectors on Connect clusters in the > process of being upgraded will continue to use the {{RangeAssignor}} until > all workers in the cluster have been upgraded, and then will switch over to > the new {{CooperativeStickyAssignor}} automatically. > > This improvement is viable as far back as -2.3- 2.4, when the > {{CooperativeStickyAssignor}} was introduced, but given that it is not a bug > fix, should only be applied to the C
[jira] [Commented] (KAFKA-12463) Update default consumer partition assignor for sink tasks
[ https://issues.apache.org/jira/browse/KAFKA-12463?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17302192#comment-17302192 ] A. Sophie Blee-Goldman commented on KAFKA-12463: I think it still has the ConsumerCoordinator somewhere in the mix, though it would be good to understand exactly where/how that gets used within Connect: but I look a quick look at the code and a connect Worker will instantiate a KafkaConsumer, which in turn will always have a ConsumerCoordinator and therefore a KIP-429-style EAGER vs COOPERATIVE protocol. (And yes, the KafkaConsumer in Connect uses group management) > Update default consumer partition assignor for sink tasks > - > > Key: KAFKA-12463 > URL: https://issues.apache.org/jira/browse/KAFKA-12463 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Reporter: Chris Egerton >Assignee: Chris Egerton >Priority: Major > > Kafka consumers have a pluggable [partition assignment > interface|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.html] > that comes with several out-of-the-box implementations including the > [RangeAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/RangeAssignor.html], > > [RoundRobinAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/RoundRobinAssignor.html], > > [StickyAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/StickyAssignor.html], > and > [CooperativeStickyAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/CooperativeStickyAssignor.html]. > If no partition assignor is configured with a consumer, the {{RangeAssignor}} > is used by default. Although there are some benefits to this assignor > including stability of assignment across generations and simplicity of > design, it comes with a major drawback: the number of active consumers in a > group is limited to the number of partitions in the topic(s) with the most > partitions. For an example of the worst case, in a consumer group where every > member is subscribed to ten topics that each have one partition, only one > member of that group will be assigned any topic partitions. > This can end up producing counterintuitive and even frustrating behavior when > a sink connector is brought up with N tasks to read from some collection of > topics with a total of N topic partitions, but some tasks end up idling and > not processing any data. > > [KIP-429|https://cwiki.apache.org/confluence/display/KAFKA/KIP-429%3A+Kafka+Consumer+Incremental+Rebalance+Protocol] > introduced the {{CooperativeStickyAssignor}}, which seeks to provide a > stable assignment across generations wherever possible, provide the most even > assignment possible (taking into account possible differences in > subscriptions across consumers in the group), and allow consumers to continue > processing data during rebalance. The documentation for the assignor states > that "Users should prefer this assignor for newer clusters." > We should alter the default consumer configuration for sink tasks to use the > new {{CooperativeStickyAssignor}}. In order to do this in a > backwards-compatible fashion that also enables rolling upgrades, this should > be implemented by setting the {{partition.assignment.strategy}} property of > sink task consumers to the list > {{org.apache.kafka.clients.consumer.CooperativeStickyAssignor, > org.apache.kafka.clients.consumer.RangeAssignor}} when no worker-level or > connector-level override is present. > This way, consumer groups for sink connectors on Connect clusters in the > process of being upgraded will continue to use the {{RangeAssignor}} until > all workers in the cluster have been upgraded, and then will switch over to > the new {{CooperativeStickyAssignor}} automatically. > > This improvement is viable as far back as -2.3- 2.4, when the > {{CooperativeStickyAssignor}} was introduced, but given that it is not a bug > fix, should only be applied to the Connect framework in an upcoming minor > release. This does not preclude users from following the steps outlined here > to improve sink connector behavior on existing clusters by modifying their > worker configs to use {{consumer.partition.assignment.strategy =}} > {{org.apache.kafka.clients.consumer.CooperativeStickyAssignor, > org.apache.kafka.clients.consumer.RangeAssignor}}, or doing the same on a > per-connector basis using the > {{consumer.override.partition.assignment.strategy}} property. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-12463) Update default consumer partition assignor for sink tasks
[ https://issues.apache.org/jira/browse/KAFKA-12463?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17302191#comment-17302191 ] Guozhang Wang commented on KAFKA-12463: --- Connect does not use ConsumerCoordinator but implements its own ConnectCoordinator, I vaguely remember while working on KIP-429 I looked at the connect code and concluded that Kafka Connect does not require a second rolling bounce, that's what I wrote down: {code} Note the difference with KIP-415 here: since on consumer we do not have the luxury to leverage on list of built-in assignors since it is user-customizable and hence would be black box to the consumer coordinator, we'd need two rolling bounces instead of one rolling bounce to complete the upgrade, whereas Connect only need one rolling bounce. {code} > Update default consumer partition assignor for sink tasks > - > > Key: KAFKA-12463 > URL: https://issues.apache.org/jira/browse/KAFKA-12463 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Reporter: Chris Egerton >Assignee: Chris Egerton >Priority: Major > > Kafka consumers have a pluggable [partition assignment > interface|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.html] > that comes with several out-of-the-box implementations including the > [RangeAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/RangeAssignor.html], > > [RoundRobinAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/RoundRobinAssignor.html], > > [StickyAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/StickyAssignor.html], > and > [CooperativeStickyAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/CooperativeStickyAssignor.html]. > If no partition assignor is configured with a consumer, the {{RangeAssignor}} > is used by default. Although there are some benefits to this assignor > including stability of assignment across generations and simplicity of > design, it comes with a major drawback: the number of active consumers in a > group is limited to the number of partitions in the topic(s) with the most > partitions. For an example of the worst case, in a consumer group where every > member is subscribed to ten topics that each have one partition, only one > member of that group will be assigned any topic partitions. > This can end up producing counterintuitive and even frustrating behavior when > a sink connector is brought up with N tasks to read from some collection of > topics with a total of N topic partitions, but some tasks end up idling and > not processing any data. > > [KIP-429|https://cwiki.apache.org/confluence/display/KAFKA/KIP-429%3A+Kafka+Consumer+Incremental+Rebalance+Protocol] > introduced the {{CooperativeStickyAssignor}}, which seeks to provide a > stable assignment across generations wherever possible, provide the most even > assignment possible (taking into account possible differences in > subscriptions across consumers in the group), and allow consumers to continue > processing data during rebalance. The documentation for the assignor states > that "Users should prefer this assignor for newer clusters." > We should alter the default consumer configuration for sink tasks to use the > new {{CooperativeStickyAssignor}}. In order to do this in a > backwards-compatible fashion that also enables rolling upgrades, this should > be implemented by setting the {{partition.assignment.strategy}} property of > sink task consumers to the list > {{org.apache.kafka.clients.consumer.CooperativeStickyAssignor, > org.apache.kafka.clients.consumer.RangeAssignor}} when no worker-level or > connector-level override is present. > This way, consumer groups for sink connectors on Connect clusters in the > process of being upgraded will continue to use the {{RangeAssignor}} until > all workers in the cluster have been upgraded, and then will switch over to > the new {{CooperativeStickyAssignor}} automatically. > > This improvement is viable as far back as -2.3- 2.4, when the > {{CooperativeStickyAssignor}} was introduced, but given that it is not a bug > fix, should only be applied to the Connect framework in an upcoming minor > release. This does not preclude users from following the steps outlined here > to improve sink connector behavior on existing clusters by modifying their > worker configs to use {{consumer.partition.assignment.strategy =}} > {{org.apache.kafka.clients.consumer.CooperativeStickyAssignor, > org.apache.kafka.clients.consumer.RangeAssignor}}, or doing the same on a > per-connector basis using the > {{consumer.override.partition.assignment.strategy}} property. -- This
[jira] [Commented] (KAFKA-12463) Update default consumer partition assignor for sink tasks
[ https://issues.apache.org/jira/browse/KAFKA-12463?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17302183#comment-17302183 ] A. Sophie Blee-Goldman commented on KAFKA-12463: Now I don't know if Connect actually uses the ConsumerCoordinator or not – I've seen that there is Connect-specific implementation of the AbstractCoordinator, called the WorderCoordinator, so I would assume not. But I vaguely recall someone from Connect telling me once that you do still rely on the ConsumerCoordinator somewhere/somehow. In which case I would think that you need the double rolling bounce as well That said, do you even need the CooperativeStickyAssignor? Doesn't Connect have its own version of cooperative rebalancing, can't you just use the normal (EAGER) StickyAssignor? I really don't have much insight on how the Consumer client is embedded in Connect so maybe the consumer-level cooperative protocol would still be helpful on top of whatever Connect does. > Update default consumer partition assignor for sink tasks > - > > Key: KAFKA-12463 > URL: https://issues.apache.org/jira/browse/KAFKA-12463 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Reporter: Chris Egerton >Assignee: Chris Egerton >Priority: Major > > Kafka consumers have a pluggable [partition assignment > interface|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.html] > that comes with several out-of-the-box implementations including the > [RangeAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/RangeAssignor.html], > > [RoundRobinAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/RoundRobinAssignor.html], > > [StickyAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/StickyAssignor.html], > and > [CooperativeStickyAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/CooperativeStickyAssignor.html]. > If no partition assignor is configured with a consumer, the {{RangeAssignor}} > is used by default. Although there are some benefits to this assignor > including stability of assignment across generations and simplicity of > design, it comes with a major drawback: the number of active consumers in a > group is limited to the number of partitions in the topic(s) with the most > partitions. For an example of the worst case, in a consumer group where every > member is subscribed to ten topics that each have one partition, only one > member of that group will be assigned any topic partitions. > This can end up producing counterintuitive and even frustrating behavior when > a sink connector is brought up with N tasks to read from some collection of > topics with a total of N topic partitions, but some tasks end up idling and > not processing any data. > > [KIP-429|https://cwiki.apache.org/confluence/display/KAFKA/KIP-429%3A+Kafka+Consumer+Incremental+Rebalance+Protocol] > introduced the {{CooperativeStickyAssignor}}, which seeks to provide a > stable assignment across generations wherever possible, provide the most even > assignment possible (taking into account possible differences in > subscriptions across consumers in the group), and allow consumers to continue > processing data during rebalance. The documentation for the assignor states > that "Users should prefer this assignor for newer clusters." > We should alter the default consumer configuration for sink tasks to use the > new {{CooperativeStickyAssignor}}. In order to do this in a > backwards-compatible fashion that also enables rolling upgrades, this should > be implemented by setting the {{partition.assignment.strategy}} property of > sink task consumers to the list > {{org.apache.kafka.clients.consumer.CooperativeStickyAssignor, > org.apache.kafka.clients.consumer.RangeAssignor}} when no worker-level or > connector-level override is present. > This way, consumer groups for sink connectors on Connect clusters in the > process of being upgraded will continue to use the {{RangeAssignor}} until > all workers in the cluster have been upgraded, and then will switch over to > the new {{CooperativeStickyAssignor}} automatically. > > This improvement is viable as far back as -2.3- 2.4, when the > {{CooperativeStickyAssignor}} was introduced, but given that it is not a bug > fix, should only be applied to the Connect framework in an upcoming minor > release. This does not preclude users from following the steps outlined here > to improve sink connector behavior on existing clusters by modifying their > worker configs to use {{consumer.partition.assignment.strategy =}} > {{org.apache.kafka.clients.consumer.CooperativeStickyAssignor,
[jira] [Commented] (KAFKA-12463) Update default consumer partition assignor for sink tasks
[ https://issues.apache.org/jira/browse/KAFKA-12463?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17302179#comment-17302179 ] A. Sophie Blee-Goldman commented on KAFKA-12463: Wait, no, I double checked in the code and you actually do need the two rolling bounces for the plain Consumer client. You can't put the CooperativeStickyAssignor first in the list, it has to be at a lower priority during the first rolling bounce. The reason is that we choose which rebalancing protocol to follow based on the first assignor in the list, regardless of whether this one is actually chosen for the rebalance or even supported by all consumers. This happens inside the ConsumerCoordinator constructor, so we don't know anything about the other consumers at this point. And we need to know the protocol before a rebalance starts, so that we revoke all partitions in the case of EAGER, which is why we can't just wait until an assignor is chosen during the rebalance > Update default consumer partition assignor for sink tasks > - > > Key: KAFKA-12463 > URL: https://issues.apache.org/jira/browse/KAFKA-12463 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Reporter: Chris Egerton >Assignee: Chris Egerton >Priority: Major > > Kafka consumers have a pluggable [partition assignment > interface|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.html] > that comes with several out-of-the-box implementations including the > [RangeAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/RangeAssignor.html], > > [RoundRobinAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/RoundRobinAssignor.html], > > [StickyAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/StickyAssignor.html], > and > [CooperativeStickyAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/CooperativeStickyAssignor.html]. > If no partition assignor is configured with a consumer, the {{RangeAssignor}} > is used by default. Although there are some benefits to this assignor > including stability of assignment across generations and simplicity of > design, it comes with a major drawback: the number of active consumers in a > group is limited to the number of partitions in the topic(s) with the most > partitions. For an example of the worst case, in a consumer group where every > member is subscribed to ten topics that each have one partition, only one > member of that group will be assigned any topic partitions. > This can end up producing counterintuitive and even frustrating behavior when > a sink connector is brought up with N tasks to read from some collection of > topics with a total of N topic partitions, but some tasks end up idling and > not processing any data. > > [KIP-429|https://cwiki.apache.org/confluence/display/KAFKA/KIP-429%3A+Kafka+Consumer+Incremental+Rebalance+Protocol] > introduced the {{CooperativeStickyAssignor}}, which seeks to provide a > stable assignment across generations wherever possible, provide the most even > assignment possible (taking into account possible differences in > subscriptions across consumers in the group), and allow consumers to continue > processing data during rebalance. The documentation for the assignor states > that "Users should prefer this assignor for newer clusters." > We should alter the default consumer configuration for sink tasks to use the > new {{CooperativeStickyAssignor}}. In order to do this in a > backwards-compatible fashion that also enables rolling upgrades, this should > be implemented by setting the {{partition.assignment.strategy}} property of > sink task consumers to the list > {{org.apache.kafka.clients.consumer.CooperativeStickyAssignor, > org.apache.kafka.clients.consumer.RangeAssignor}} when no worker-level or > connector-level override is present. > This way, consumer groups for sink connectors on Connect clusters in the > process of being upgraded will continue to use the {{RangeAssignor}} until > all workers in the cluster have been upgraded, and then will switch over to > the new {{CooperativeStickyAssignor}} automatically. > > This improvement is viable as far back as -2.3- 2.4, when the > {{CooperativeStickyAssignor}} was introduced, but given that it is not a bug > fix, should only be applied to the Connect framework in an upcoming minor > release. This does not preclude users from following the steps outlined here > to improve sink connector behavior on existing clusters by modifying their > worker configs to use {{consumer.partition.assignment.strategy =}} > {{org.apache.kafka.clients.consumer.CooperativeStickyAssignor, >
[jira] [Commented] (KAFKA-12463) Update default consumer partition assignor for sink tasks
[ https://issues.apache.org/jira/browse/KAFKA-12463?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17302167#comment-17302167 ] A. Sophie Blee-Goldman commented on KAFKA-12463: Isn't the upgrade path for Connect identical to the one for the Consumer? There's no mention of Connect in KIP-429 because that KIP focuses on cooperative rebalancing for the plain Consumer and Kafka Streams, while Connect had its own separate KIP and protocol for incremental cooperative rebalancing. But if Connect relies on the same underlying consumer group protocol, then it should apply as well. That said, I just re-read the Consumer upgrade section and it's got some misinformation. Seems to be implying that if you have mixed assignor protocols, you can end up with some consumers choosing an EAGER assignor while others choose the COOPERATIVE one, which is just...not true. Fake News! Anyways thanks for raising this, I'll go clean it up. That's also a good point about one vs two rolling bounces: technically you don't _need_ the second rolling bounce if you listed the CooperativeStickyAssignor first in the preferred list on the initial rolling bounce. I guess the intention was to leave the consumer group in a stable state, so if for example you deployed a new consumer without explicitly configuring it to use the CooperativeStickyAssignor then it would fail rather than silently switch back to EAGER rebalancing. But it's not required. I'll go clarify that in the KIP as well. > Update default consumer partition assignor for sink tasks > - > > Key: KAFKA-12463 > URL: https://issues.apache.org/jira/browse/KAFKA-12463 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Reporter: Chris Egerton >Assignee: Chris Egerton >Priority: Major > > Kafka consumers have a pluggable [partition assignment > interface|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.html] > that comes with several out-of-the-box implementations including the > [RangeAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/RangeAssignor.html], > > [RoundRobinAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/RoundRobinAssignor.html], > > [StickyAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/StickyAssignor.html], > and > [CooperativeStickyAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/CooperativeStickyAssignor.html]. > If no partition assignor is configured with a consumer, the {{RangeAssignor}} > is used by default. Although there are some benefits to this assignor > including stability of assignment across generations and simplicity of > design, it comes with a major drawback: the number of active consumers in a > group is limited to the number of partitions in the topic(s) with the most > partitions. For an example of the worst case, in a consumer group where every > member is subscribed to ten topics that each have one partition, only one > member of that group will be assigned any topic partitions. > This can end up producing counterintuitive and even frustrating behavior when > a sink connector is brought up with N tasks to read from some collection of > topics with a total of N topic partitions, but some tasks end up idling and > not processing any data. > > [KIP-429|https://cwiki.apache.org/confluence/display/KAFKA/KIP-429%3A+Kafka+Consumer+Incremental+Rebalance+Protocol] > introduced the {{CooperativeStickyAssignor}}, which seeks to provide a > stable assignment across generations wherever possible, provide the most even > assignment possible (taking into account possible differences in > subscriptions across consumers in the group), and allow consumers to continue > processing data during rebalance. The documentation for the assignor states > that "Users should prefer this assignor for newer clusters." > We should alter the default consumer configuration for sink tasks to use the > new {{CooperativeStickyAssignor}}. In order to do this in a > backwards-compatible fashion that also enables rolling upgrades, this should > be implemented by setting the {{partition.assignment.strategy}} property of > sink task consumers to the list > {{org.apache.kafka.clients.consumer.CooperativeStickyAssignor, > org.apache.kafka.clients.consumer.RangeAssignor}} when no worker-level or > connector-level override is present. > This way, consumer groups for sink connectors on Connect clusters in the > process of being upgraded will continue to use the {{RangeAssignor}} until > all workers in the cluster have been upgraded, and then will switch over to > the new {{CooperativeStickyAssignor}} automatically
[jira] [Commented] (KAFKA-12463) Update default consumer partition assignor for sink tasks
[ https://issues.apache.org/jira/browse/KAFKA-12463?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17301706#comment-17301706 ] Chris Egerton commented on KAFKA-12463: --- cc [~rhauch] what do you think about this? > Update default consumer partition assignor for sink tasks > - > > Key: KAFKA-12463 > URL: https://issues.apache.org/jira/browse/KAFKA-12463 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Reporter: Chris Egerton >Assignee: Chris Egerton >Priority: Major > > Kafka consumers have a pluggable [partition assignment > interface|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.html] > that comes with several out-of-the-box implementations including the > [RangeAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/RangeAssignor.html], > > [RoundRobinAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/RoundRobinAssignor.html], > > [StickyAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/StickyAssignor.html], > and > [CooperativeStickyAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/CooperativeStickyAssignor.html]. > If no partition assignor is configured with a consumer, the {{RangeAssignor}} > is used by default. Although there are some benefits to this assignor > including stability of assignment across generations and simplicity of > design, it comes with a major drawback: the number of active consumers in a > group is limited to the number of partitions in the topic(s) with the most > partitions. For an example of the worst case, in a consumer group where every > member is subscribed to ten topics that each have one partition, only one > member of that group will be assigned any topic partitions. > This can end up producing counterintuitive and even frustrating behavior when > a sink connector is brought up with N tasks to read from some collection of > topics with a total of N topic partitions, but some tasks end up idling and > not processing any data. > > [KIP-429|https://cwiki.apache.org/confluence/display/KAFKA/KIP-429%3A+Kafka+Consumer+Incremental+Rebalance+Protocol] > introduced the {{CooperativeStickyAssignor}}, which seeks to provide a > stable assignment across generations wherever possible, provide the most even > assignment possible (taking into account possible differences in > subscriptions across consumers in the group), and allow consumers to continue > processing data during rebalance. The documentation for the assignor states > that "Users should prefer this assignor for newer clusters." > We should alter the default consumer configuration for sink tasks to use the > new {{CooperativeStickyAssignor}}. In order to do this in a > backwards-compatible fashion that also enables rolling upgrades, this should > be implemented by setting the {{partition.assignment.strategy}} property of > sink task consumers to the list > {{org.apache.kafka.clients.consumer.CooperativeStickyAssignor, > org.apache.kafka.clients.consumer.RangeAssignor}} when no worker-level or > connector-level override is present. > This way, consumer groups for sink connectors on Connect clusters in the > process of being upgraded will continue to use the {{RangeAssignor}} until > all workers in the cluster have been upgraded, and then will switch over to > the new {{CooperativeStickyAssignor}} automatically. > > This improvement is viable as far back as -2.3- 2.4, when the > {{CooperativeStickyAssignor}} was introduced, but given that it is not a bug > fix, should only be applied to the Connect framework in an upcoming minor > release. This does not preclude users from following the steps outlined here > to improve sink connector behavior on existing clusters by modifying their > worker configs to use {{consumer.partition.assignment.strategy =}} > {{org.apache.kafka.clients.consumer.CooperativeStickyAssignor, > org.apache.kafka.clients.consumer.RangeAssignor}}, or doing the same on a > per-connector basis using the > {{consumer.override.partition.assignment.strategy}} property. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-12463) Update default consumer partition assignor for sink tasks
[ https://issues.apache.org/jira/browse/KAFKA-12463?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17301327#comment-17301327 ] Chris Egerton commented on KAFKA-12463: --- Ah, thanks [~ableegoldman], I'd misread the javadocs for the cooperative sticky assignor. RE clearness on the upgrade section in KIP-429–I didn't see a specific section for Connect, and both of the sections that were provided ("Consumer" and "Streams") provided a different procedure than the one I proposed here. It seems like an implicit goal of both of them is to arrive at an end state where all consumers only provide the cooperative assignor in their list of supported assignors, instead of the cooperative assignor first and with the other, older assignor behind it. I'm wondering if the lack of that goal is why this different approach (which only requires one-step rolling as opposed to two) is viable here but not necessarily for other applications? > Update default consumer partition assignor for sink tasks > - > > Key: KAFKA-12463 > URL: https://issues.apache.org/jira/browse/KAFKA-12463 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Reporter: Chris Egerton >Assignee: Chris Egerton >Priority: Major > > Kafka consumers have a pluggable [partition assignment > interface|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.html] > that comes with several out-of-the-box implementations including the > [RangeAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/RangeAssignor.html], > > [RoundRobinAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/RoundRobinAssignor.html], > > [StickyAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/StickyAssignor.html], > and > [CooperativeStickyAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/CooperativeStickyAssignor.html]. > If no partition assignor is configured with a consumer, the {{RangeAssignor}} > is used by default. Although there are some benefits to this assignor > including stability of assignment across generations and simplicity of > design, it comes with a major drawback: the number of active consumers in a > group is limited to the number of partitions in the topic(s) with the most > partitions. For an example of the worst case, in a consumer group where every > member is subscribed to ten topics that each have one partition, only one > member of that group will be assigned any topic partitions. > This can end up producing counterintuitive and even frustrating behavior when > a sink connector is brought up with N tasks to read from some collection of > topics with a total of N topic partitions, but some tasks end up idling and > not processing any data. > > [KIP-429|https://cwiki.apache.org/confluence/display/KAFKA/KIP-429%3A+Kafka+Consumer+Incremental+Rebalance+Protocol] > introduced the {{CooperativeStickyAssignor}}, which seeks to provide a > stable assignment across generations wherever possible, provide the most even > assignment possible (taking into account possible differences in > subscriptions across consumers in the group), and allow consumers to continue > processing data during rebalance. The documentation for the assignor states > that "Users should prefer this assignor for newer clusters." > We should alter the default consumer configuration for sink tasks to use the > new {{CooperativeStickyAssignor}}. In order to do this in a > backwards-compatible fashion that also enables rolling upgrades, this should > be implemented by setting the {{partition.assignment.strategy}} property of > sink task consumers to the list > {{org.apache.kafka.clients.consumer.CooperativeStickyAssignor, > org.apache.kafka.clients.consumer.RangeAssignor}} when no worker-level or > connector-level override is present. > This way, consumer groups for sink connectors on Connect clusters in the > process of being upgraded will continue to use the {{RangeAssignor}} until > all workers in the cluster have been upgraded, and then will switch over to > the new {{CooperativeStickyAssignor}} automatically. > > This improvement is viable as far back as -2.3- 2.4, when the > {{CooperativeStickyAssignor}} was introduced, but given that it is not a bug > fix, should only be applied to the Connect framework in an upcoming minor > release. This does not preclude users from following the steps outlined here > to improve sink connector behavior on existing clusters by modifying their > worker configs to use {{consumer.partition.assignment.strategy =}} > {{org.apache.kafka.clients.consumer.CooperativeStickyAssignor, > org.apache.kafka.clients.consumer
[jira] [Commented] (KAFKA-12463) Update default consumer partition assignor for sink tasks
[ https://issues.apache.org/jira/browse/KAFKA-12463?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17301321#comment-17301321 ] A. Sophie Blee-Goldman commented on KAFKA-12463: [~ChrisEgerton] just fyi, the CooperativeStickyAssignor was introduced in 2.4, not 2.3. Also, that upgrade protocol sounds correct but it should be covered in the KIP-429 document – did you check out the steps listed for Consumer upgrades in the section called "Compatibility and Upgrade Path"? [Link|https://cwiki.apache.org/confluence/display/KAFKA/KIP-429%3A+Kafka+Consumer+Incremental+Rebalance+Protocol#KIP429:KafkaConsumerIncrementalRebalanceProtocol-Consumer] If you had a hard time finding that info or found the instructions unclear, please lmk so I can try to improve the docs. > Update default consumer partition assignor for sink tasks > - > > Key: KAFKA-12463 > URL: https://issues.apache.org/jira/browse/KAFKA-12463 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Reporter: Chris Egerton >Assignee: Chris Egerton >Priority: Major > > Kafka consumers have a pluggable [partition assignment > interface|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.html] > that comes with several out-of-the-box implementations including the > [RangeAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/RangeAssignor.html], > > [RoundRobinAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/RoundRobinAssignor.html], > > [StickyAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/StickyAssignor.html], > and > [CooperativeStickyAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/CooperativeStickyAssignor.html]. > If no partition assignor is configured with a consumer, the {{RangeAssignor}} > is used by default. Although there are some benefits to this assignor > including stability of assignment across generations and simplicity of > design, it comes with a major drawback: the number of active consumers in a > group is limited to the number of partitions in the topic(s) with the most > partitions. For an example of the worst case, in a consumer group where every > member is subscribed to ten topics that each have one partition, only one > member of that group will be assigned any topic partitions. > This can end up producing counterintuitive and even frustrating behavior when > a sink connector is brought up with N tasks to read from some collection of > topics with a total of N topic partitions, but some tasks end up idling and > not processing any data. > > [KIP-429|https://cwiki.apache.org/confluence/display/KAFKA/KIP-429%3A+Kafka+Consumer+Incremental+Rebalance+Protocol] > introduced the {{CooperativeStickyAssignor}}, which seeks to provide a > stable assignment across generations wherever possible, provide the most even > assignment possible (taking into account possible differences in > subscriptions across consumers in the group), and allow consumers to continue > processing data during rebalance. The documentation for the assignor states > that "Users should prefer this assignor for newer clusters." > We should alter the default consumer configuration for sink tasks to use the > new {{CooperativeStickyAssignor}}. In order to do this in a > backwards-compatible fashion that also enables rolling upgrades, this should > be implemented by setting the {{partition.assignment.strategy}} property of > sink task consumers to the list > {{org.apache.kafka.clients.consumer.CooperativeStickyAssignor, > org.apache.kafka.clients.consumer.RangeAssignor}} when no worker-level or > connector-level override is present. > This way, consumer groups for sink connectors on Connect clusters in the > process of being upgraded will continue to use the {{RangeAssignor}} until > all workers in the cluster have been upgraded, and then will switch over to > the new {{CooperativeStickyAssignor}} automatically. > > This improvement is viable as far back as 2.3, when the > {{CooperativeStickyAssignor}} was introduced, but given that it is not a bug > fix, should only be applied to the Connect framework in an upcoming minor > release. This does not preclude users from following the steps outlined here > to improve sink connector behavior on existing clusters by modifying their > worker configs to use {{consumer.partition.assignment.strategy =}} > {{org.apache.kafka.clients.consumer.CooperativeStickyAssignor, > org.apache.kafka.clients.consumer.RangeAssignor}}, or doing the same on a > per-connector basis using the > {{consumer.override.partition.assignment.strategy}} property. -- This message was sent
[jira] [Commented] (KAFKA-12463) Update default consumer partition assignor for sink tasks
[ https://issues.apache.org/jira/browse/KAFKA-12463?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17301235#comment-17301235 ] Chris Egerton commented on KAFKA-12463: --- [~bchen225242] can you confirm the upgrade semantics outlined here? I think the Kafka group coordinator should force every consumer in the group to use range assignment until every consumer in the group is configured to use both the cooperative sticky assignor and the range assignor (in that order), at which point the group would automatically switch over to cooperative sticky assignment, but want to make sure since this upgrade path isn't covered in KIP-429. > Update default consumer partition assignor for sink tasks > - > > Key: KAFKA-12463 > URL: https://issues.apache.org/jira/browse/KAFKA-12463 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Reporter: Chris Egerton >Assignee: Chris Egerton >Priority: Major > > Kafka consumers have a pluggable [partition assignment > interface|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.html] > that comes with several out-of-the-box implementations including the > [RangeAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/RangeAssignor.html], > > [RoundRobinAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/RoundRobinAssignor.html], > > [StickyAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/StickyAssignor.html], > and > [CooperativeStickyAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/CooperativeStickyAssignor.html]. > If no partition assignor is configured with a consumer, the {{RangeAssignor}} > is used by default. Although there are some benefits to this assignor > including stability of assignment across generations and simplicity of > design, it comes with a major drawback: the number of active consumers in a > group is limited to the number of partitions in the topic(s) with the most > partitions. For an example of the worst case, in a consumer group where every > member is subscribed to ten topics that each have one partition, only one > member of that group will be assigned any topic partitions. > This can end up producing counterintuitive and even frustrating behavior when > a sink connector is brought up with N tasks to read from some collection of > topics with a total of N topic partitions, but some tasks end up idling and > not processing any data. > > [KIP-429|https://cwiki.apache.org/confluence/display/KAFKA/KIP-429%3A+Kafka+Consumer+Incremental+Rebalance+Protocol] > introduced the {{CooperativeStickyAssignor}}, which seeks to provide a > stable assignment across generations wherever possible, provide the most even > assignment possible (taking into account possible differences in > subscriptions across consumers in the group), and allow consumers to continue > processing data during rebalance. The documentation for the assignor states > that "Users should prefer this assignor for newer clusters." > We should alter the default consumer configuration for sink tasks to use the > new {{CooperativeStickyAssignor}}. In order to do this in a > backwards-compatible fashion that also enables rolling upgrades, this should > be implemented by setting the {{partition.assignment.strategy}} property of > sink task consumers to the list > {{org.apache.kafka.clients.consumer.CooperativeStickyAssignor, > org.apache.kafka.clients.consumer.RangeAssignor}} when no worker-level or > connector-level override is present. > This way, consumer groups for sink connectors on Connect clusters in the > process of being upgraded will continue to use the {{RangeAssignor}} until > all workers in the cluster have been upgraded, and then will switch over to > the new {{CooperativeStickyAssignor}} automatically. > > This improvement is viable as far back as 2.3, when the > {{CooperativeStickyAssignor}} was introduced, but given that it is not a bug > fix, should only be applied to the Connect framework in an upcoming minor > release. This does not preclude users from following the steps outlined here > to improve sink connector behavior on existing clusters by modifying their > worker configs to use {{consumer.partition.assignment.strategy =}} > {{org.apache.kafka.clients.consumer.CooperativeStickyAssignor, > org.apache.kafka.clients.consumer.RangeAssignor}}, or doing the same on a > per-connector basis using the > {{consumer.override.partition.assignment.strategy}} property. -- This message was sent by Atlassian Jira (v8.3.4#803005)