Re: [DISCUSS] KIP-54 Sticky Partition Assignment Strategy
Thanks Andrew for your feedback and interest on this feature. If there is no further feedback on this KIP (and no objection) I'll start the voting process soon. Thanks. --Vahid From: Andrew Coates To: dev@kafka.apache.org Date: 08/10/2016 12:38 AM Subject:Re: [DISCUSS] KIP-54 Sticky Partition Assignment Strategy I'm still very interested in seeing this KIP progress ... On Tue, 2 Aug 2016 at 20:09, Vahid S Hashemian wrote: > I would like to revive this thread and ask for additional feedback on this > KIP. > > There have already been some feedback, mostly in favor, plus some concern > about the value gain considering the complexity and the semantics; i.e. > how the eventually revoked assignments need to be processed in the > onPartitionsAssigned() callback, and not in onPartitionsRevoked(). > > If it helps, I could also send a note to users mailing list about this KIP > and ask for their feedback. > I could also put the KIP up for a vote if that is expected at this point. > > Thanks. > --Vahid > > >
Re: [DISCUSS] KIP-54 Sticky Partition Assignment Strategy
I'm still very interested in seeing this KIP progress ... On Tue, 2 Aug 2016 at 20:09, Vahid S Hashemian wrote: > I would like to revive this thread and ask for additional feedback on this > KIP. > > There have already been some feedback, mostly in favor, plus some concern > about the value gain considering the complexity and the semantics; i.e. > how the eventually revoked assignments need to be processed in the > onPartitionsAssigned() callback, and not in onPartitionsRevoked(). > > If it helps, I could also send a note to users mailing list about this KIP > and ask for their feedback. > I could also put the KIP up for a vote if that is expected at this point. > > Thanks. > --Vahid > > >
Re: [DISCUSS] KIP-54 Sticky Partition Assignment Strategy
I would like to revive this thread and ask for additional feedback on this KIP. There have already been some feedback, mostly in favor, plus some concern about the value gain considering the complexity and the semantics; i.e. how the eventually revoked assignments need to be processed in the onPartitionsAssigned() callback, and not in onPartitionsRevoked(). If it helps, I could also send a note to users mailing list about this KIP and ask for their feedback. I could also put the KIP up for a vote if that is expected at this point. Thanks. --Vahid
Re: [DISCUSS] KIP-54 Sticky Partition Assignment Strategy
Hi Gouzhang, Thanks for the reference. A similar question was asked earlier about whether, with sticky assignor, consumers stick to their previous partitions if they die and come back later. Currently the sticky assignor does not support that because it only preserves only the last assignment before the rebalance. If a consumer dies and comes back during different rebalance intervals there is no guarantee it would gets its previous partitions. If the community sees this as an important requirement for the sticky assignor we can definitely include it in the KIP. Regards, - Vahid Hashemian, Ph.D. Advisory Software Engineer, IBM Cloud Email: vahidhashem...@us.ibm.com Phone: 1-408-463-2380 IBM Silicon Valley Lab 555 Bailey Ave. San Jose, CA 95141 From: Guozhang Wang To: "dev@kafka.apache.org" Date: 06/23/2016 03:28 PM Subject: Re: [DISCUSS] KIP-54 Sticky Partition Assignment Strategy Just adding some related reference here: Henry Cai is contributing some advanced feature in Kafka Streams regarding static assignment: https://github.com/apache/kafka/pull/1543 The main motivation is that when you do rolling bounce for upgrading your Kafka Streams code, for example, you would prefer to not move assigned partitions of the current bouncing instance to others, and today it is worked around by increasing the session.timeout; but what is more tricky is that when the bouncing instance comes back, it will still trigger a rebalance. The idea is that as long as we can encode the previous iteration's assignment map, and we can check that the list of partitions / members does not change regarding to their previous assigned partitions, we keep the assigned as is. Guozhang On Thu, Jun 23, 2016 at 10:24 AM, Andrew Coates wrote: > Hey Jason, > > Good to know on the round robin assignment. I'll look into that. > > The issue I have with the current rebalance listener is that it's not > intuitive and unnecessarily exposes the inner workings of rebalance logic. > When the onPartitionsRevoked method is called it's not really saying the > partitions were revoked. It's really saying a rebalance is happening and > you need to deal with any in-flight partitions & commit offsets. So maybe > the method name is wrong! Maybe it should be 'onRebalance' or > 'commitOffsets'..? Then the interface could also have an > onPartitionsRevoked method that is only called when partitions have been > revoked and given to someone else to handle, rather than just kind of > paused while we rebalance... maybe the new method could be > onPausePartitions? > > Andy > > On Thu, 23 Jun 2016, 18:06 Jason Gustafson, wrote: > > > Hey Andy, > > > > Thanks for jumping in. A couple comments: > > > > In addition, I think it is important that during a rebalance consumers do > > > not first have all partitions revoked, only to have a very similar, (or > > the > > > same!), set reassigned. This is less than initiative and complicates > > client > > > code unnecessarily. Instead, the `ConsumerPartitionListener` should > only > > be > > > called for true changes in assignment I.e. any new partitions assigned > > and > > > any existing ones revoked, when comparing the new assignment to the > > > previous one. > > > > > > The problem is that the revocation callback is called before you know > what > > the assignment for the next generation will be. This is necessary for the > > consumer to be able to commit offsets for its assigned partitions. Once > the > > consumer has a new assignment, it is no longer safe to commit offsets > from > > the previous generation. Unless sticky assignment can give us some > > guarantee on which partitions will remain after the rebalance, all of > them > > must be included in the revocation callback. > > > > > > > There is one last scenario I'd like to highlight that I think the KIP > > > should describe: say you have a group consuming from two topics, each > > topic > > > with two partitions. As of 0.9.0.1 the maximum number of consumers you > > can > > > have is 2, not 4. With 2 consumers each will get one partition from > each > > > topic. A third consumer with not have any partitions assigned. This > > should > > > be fixed by the 'fair' part of the strategy, but it would be good to > see > > > this covered explicitly in the KIP. > > > > > > This would be true for range assignment, but with 4 partitions total, > > round-robin assignment would give one partition to each of the 4 > consum
Re: [DISCUSS] KIP-54 Sticky Partition Assignment Strategy
Hi Jason, Thanks for the thoughtful comments. Please see my response below. BTW, I have been trying to update the KIP with some of the recent discussions on the mailing list. Regards, --Vahid From: Jason Gustafson To: dev@kafka.apache.org Date: 06/27/2016 12:53 PM Subject:Re: [DISCUSS] KIP-54 Sticky Partition Assignment Strategy Hey Vahid, Comments below: I'm not very clear on the first part of this paragraph. You could clarify > it for me, but in general balancing out the partitions across consumers in > a group as much as possible would normally mean balancing the load within > the cluster, and that's something a user would want to have compared to > cases where the assignments and therefore the load could be quite > unbalanced depending on the subscriptions. I'm just wondering what kind of use cases require differing subscriptions in a steady state. Usually we expect all consumers in the group to have the same subscription, in which case the balance provided by round robin should be even (in terms of the number of assigned partitions). The only case that comes to mind is a rolling upgrade scenario in which the consumers in the group are restarted one by one with an updated subscription. It would be ideal to provide better balance in this situation, but once the upgrade finishes, the assignment should be balanced again, so it's unclear to me how significant the gain is. On the other hand, if there are cases which require differing subscriptions in a long term state, it would make this feature more compelling. I agree that if we care only about a balanced assignment with same subscriptions the round robin assignment is a good choice. But if we bring in stickiness to the mix it won't be guaranteed by the round robin assignor. An example (as Andrew mentioned in his earlier note) is elastic consumers that come and go automatically depending on the load and how much they lag behind. If these consumer maintain state of the partitions they consume from it would be reasonable to want them to stick to their assigned partitions, rather than having to repeat partition cleanup every time the number of consumers changes due to an increase or decrease in load. I'll also think about it and let you know if I come up with a use case with differing subscriptions. If differing subscriptions turns out not to be a common use case, the design and implementation of the sticky assignor could be modified to a far less complex setting so that fairness/stickiness can be guaranteed for same subscriptions. As I mentioned before, the current design / implementation is comprehensive and can be tweaked towards a less complex solution if further assumptions can be made. Since the new consumer is single threaded there is no such problem in its > round robin strategy. It simply considers consumers one by one for each > partition assignment, and when one consumer is assigned a partition, the > next assignment starts with considering the next consumer in the list (and > not the same consumer that was just assigned). This removes the > possibility of the issue reported in KAFKA-2019 surfacing in the new > consumer. In the sticky strategy we do not have this issue either, since > every time an assignment is about to happen we start with the consumer > with least number of assignments. So we will not have a scenario where a > consumer is repeated assigned partitions as in KAFKA-2019 (unless that > consumer is lagging behind other consumers on the number of partitions > assigned). Thanks for checking into this. I think the other factor is that the round robin assignor sorts the consumers using the id given them by the coordinator, which at the moment looks like this: "{clientId}-{uuid}". So if the group uses a common clientId, then it shouldn't usually be the case that two consumers on the same host get ordered together. We could actually change the order of these fields in a compatible way if we didn't like the dependence on the clientId. It seems anyway that the sticky assignor is not needed to deal with this problem. That's correct, and thanks for going into the issue in more details. Even though consumer groups are usually stable, it might be the case that > consumers do not initially join the group at the same time. The sticky > strategy in that situation lets those who joined earlier stick to their > partitions to some extent (assuming fairness take precedence over > stickiness). In terms of specific use cases, Andrew touched on examples of > how Kafka can benefit from a sticky assignor. I could add those to the KIP > if you also think they help building the case in favor of sticky assignor. > I agree with you about the downside and I'll make sure I add that to the > KIP as you suggested. Yep, I agree that it helps in some situat
Re: [DISCUSS] KIP-54 Sticky Partition Assignment Strategy
deal if we could simply skip the call to onRevoked() if the partitions remain assigned to the consumer after the rebalance. Unfortunately, the need to commit offsets prior to rebalancing makes this tricky. The other option suggested by Andy would be to introduce a third method in the rebalance listener (e.g. doOffsetCommit(partitions)). Then the consumer would call doOffsetCommit() prior to every rebalance, but only invoke onPartitionsRevoked() when partitions have actually been assigned to another consumer following the rebalance. Either way, we're making the API more complex, which would be nice to avoid unless really necessary. Overall, I think my feeling at the moment is that the sticky assignor is a nice improvement over the currently available assignors, but the gain seems a little marginal and maybe not worth the cost of the complexity mentioned above. It's not a strong feeling though and it would be nice to hear what others think. The other thing worth mentioning is that we've talked a few times in the past about the concept of "partial rebalancing," which would allow the group to reassign only a subset of the partitions it was consuming. This would let part of the group continue consuming while the group is rebalancing. We don't have any proposals ready to support this, but if we want to have this long term, then it might reduce some of the benefit provided by the sticky assignor. Thanks, Jason On Thu, Jun 23, 2016 at 5:04 PM, Vahid S Hashemian < vahidhashem...@us.ibm.com> wrote: > Thank you Andy for your feedback on the KIP. > > I agree with Jason on the responses he provided below. > > If we give precedence to fairness over stickiness there is no assumption > that can be made about which assignment would remain and which would be > revoked. > If we give precedence to stickiness over fairness, we can be sure that all > existing valid assignments (those with their topic partition still valid) > would remain. > > I'll add your example to the KIP, but this is how it should work with > sticky assignor: > > We have two consumers C0, C1 and two topics t0, t1 each with 2 partitions. > Therefore, the partitions are t0p0, t0p1, t1p0, t1p1. Let's assume the two > consumers are subscribed to both t0 and t1. > The assignment using the stick assignor will be: > * C0: [t0p0, t1p0] > * C1: [t0p1, t1p1] > > Now if we add C2 (subscribed to both topics), this is what we get: > * C0: [t1p0] > * C1: [t0p1, t1p1] > * C2: [t0p0] > > I think both range and round robin assignors would produce this: > * C0: [t0p0, t1p1] > * C1: [t0p1] > * C2: [t1p0] > > Regards, > --Vahid > > > > > From: Jason Gustafson > To: dev@kafka.apache.org > Date: 06/23/2016 10:06 AM > Subject:Re: [DISCUSS] KIP-54 Sticky Partition Assignment Strategy > > > > Hey Andy, > > Thanks for jumping in. A couple comments: > > In addition, I think it is important that during a rebalance consumers do > > not first have all partitions revoked, only to have a very similar, (or > the > > same!), set reassigned. This is less than initiative and complicates > client > > code unnecessarily. Instead, the `ConsumerPartitionListener` should only > be > > called for true changes in assignment I.e. any new partitions assigned > and > > any existing ones revoked, when comparing the new assignment to the > > previous one. > > > The problem is that the revocation callback is called before you know what > the assignment for the next generation will be. This is necessary for the > consumer to be able to commit offsets for its assigned partitions. Once > the > consumer has a new assignment, it is no longer safe to commit offsets from > the previous generation. Unless sticky assignment can give us some > guarantee on which partitions will remain after the rebalance, all of them > must be included in the revocation callback. > > > > There is one last scenario I'd like to highlight that I think the KIP > > should describe: say you have a group consuming from two topics, each > topic > > with two partitions. As of 0.9.0.1 the maximum number of consumers you > can > > have is 2, not 4. With 2 consumers each will get one partition from each > > topic. A third consumer with not have any partitions assigned. This > should > > be fixed by the 'fair' part of the strategy, but it would be good to see > > this covered explicitly in the KIP. > > > This would be true for range assignment, but with 4 partitions total, > round-robin assignment would give one partition to each of the 4 consumers > (assuming subscriptions match). > > Thanks, > Jason > > > On Thu, Jun 23, 2016 at 1:42 AM, A
Re: [DISCUSS] KIP-54 Sticky Partition Assignment Strategy
Thank you Andy for your feedback on the KIP. I agree with Jason on the responses he provided below. If we give precedence to fairness over stickiness there is no assumption that can be made about which assignment would remain and which would be revoked. If we give precedence to stickiness over fairness, we can be sure that all existing valid assignments (those with their topic partition still valid) would remain. I'll add your example to the KIP, but this is how it should work with sticky assignor: We have two consumers C0, C1 and two topics t0, t1 each with 2 partitions. Therefore, the partitions are t0p0, t0p1, t1p0, t1p1. Let's assume the two consumers are subscribed to both t0 and t1. The assignment using the stick assignor will be: * C0: [t0p0, t1p0] * C1: [t0p1, t1p1] Now if we add C2 (subscribed to both topics), this is what we get: * C0: [t1p0] * C1: [t0p1, t1p1] * C2: [t0p0] I think both range and round robin assignors would produce this: * C0: [t0p0, t1p1] * C1: [t0p1] * C2: [t1p0] Regards, --Vahid From: Jason Gustafson To: dev@kafka.apache.org Date: 06/23/2016 10:06 AM Subject: Re: [DISCUSS] KIP-54 Sticky Partition Assignment Strategy Hey Andy, Thanks for jumping in. A couple comments: In addition, I think it is important that during a rebalance consumers do > not first have all partitions revoked, only to have a very similar, (or the > same!), set reassigned. This is less than initiative and complicates client > code unnecessarily. Instead, the `ConsumerPartitionListener` should only be > called for true changes in assignment I.e. any new partitions assigned and > any existing ones revoked, when comparing the new assignment to the > previous one. The problem is that the revocation callback is called before you know what the assignment for the next generation will be. This is necessary for the consumer to be able to commit offsets for its assigned partitions. Once the consumer has a new assignment, it is no longer safe to commit offsets from the previous generation. Unless sticky assignment can give us some guarantee on which partitions will remain after the rebalance, all of them must be included in the revocation callback. > There is one last scenario I'd like to highlight that I think the KIP > should describe: say you have a group consuming from two topics, each topic > with two partitions. As of 0.9.0.1 the maximum number of consumers you can > have is 2, not 4. With 2 consumers each will get one partition from each > topic. A third consumer with not have any partitions assigned. This should > be fixed by the 'fair' part of the strategy, but it would be good to see > this covered explicitly in the KIP. This would be true for range assignment, but with 4 partitions total, round-robin assignment would give one partition to each of the 4 consumers (assuming subscriptions match). Thanks, Jason On Thu, Jun 23, 2016 at 1:42 AM, Andrew Coates wrote: > Hi all, > > I think sticky assignment is immensely important / useful in many > situations. Apps that use Kafka are many and varied. Any app that stores > any state, either in the form of data from incoming messages, cached > results from previous out-of-process calls or expensive operations, (and > let's face it, that's most!), can see a big negative impact from partition > movement. > > The main issue partition movement brings is that it makes building elastic > services very hard. Consider: you've got an app consuming from Kafka that > locally caches data to improve performance. You want the app to auto scale > as the throughout to the topic(s) increases. Currently, when one or more > new instance are added and the group rebalances, all existing instances > have all partitions revoked, and then a new, potentially quite different, > set assigned. An intuitive pattern is to evict partition state, I.e. the > cached data, when a partition is revoked. So in this case all apps flush > their entire cache causing throughput to drop massively, right when you > want to increase it! > > Even if the app is not flushing partition state when partitions are > revoked, the lack of a 'sticky' strategy means that a proportion of the > cached state is now useless, and instances have partitions assigned for > which they have no cached state, again negatively impacting throughout. > > With a 'sticky' strategy throughput can be maintained and indeed increased, > as intended. > > The same is also true in the presence of failure. An instance failing, > (maybe due to high load), can invalidate the caching of existing instances, > negatively impacting throughout of the remaining instances, (possibly at a > time the system needs throughput the most!) > > My question would be 'why move parti
Re: [DISCUSS] KIP-54 Sticky Partition Assignment Strategy
Hi Jason, I appreciate your feedback. Please see my comments below, and advise if you have further suggestions. Thanks. Regards, --Vahid From: Jason Gustafson To: dev@kafka.apache.org Date: 06/22/2016 04:41 PM Subject:Re: [DISCUSS] KIP-54 Sticky Partition Assignment Strategy Hey Vahid, Thanks for the updates. I think the lack of comments on this KIP suggests that the motivation might need a little work. Here are the two main benefits of this assignor as I see them: 1. It can give a more balanced assignment when subscriptions do not match in a group (this is the same problem solved by KIP-49). 2. It potentially allows applications to save the need to cleanup partition state when rebalancing since partitions are more likely to stay assigned to the same consumer. Does that seem right to you? Yes, it does. Your summarized it nicely. #1 is an advantage of this strategy compared to existing round robin and fair strategies. I think it's unclear how serious the first problem is. Providing better balance when subscriptions differ is nice, but are rolling updates the only scenario where this is encountered? Or are there more general use cases where differing subscriptions could persist for a longer duration? I'm also wondering if this assignor addresses the problem found in KAFKA-2019. It would be useful to confirm whether this problem still exists with the new consumer's round robin strategy and how (whether?) it is addressed by this assignor. I'm not very clear on the first part of this paragraph. You could clarify it for me, but in general balancing out the partitions across consumers in a group as much as possible would normally mean balancing the load within the cluster, and that's something a user would want to have compared to cases where the assignments and therefore the load could be quite unbalanced depending on the subscriptions. Having an optimal balance is definitely more reassuring that knowing partition assignments could get quite unbalanced. There is an example in the KIP that explains a simple use case that leads to an unbalanced assignment with round robin assignment. This unbalance could become much more severe in real use cases with many more topics / partitions / consumers, and that's ideally something we would want to avoid, if possible. Regarding KAFKA-2019, when I try the simple use case of https://issues.apache.org/jira/browse/KAFKA-2019?focusedCommentId=14360892&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14360892 each of my consumers gets 3 partitions, which is not the same as what is mentioned in the comment. I might be missing something in the configuration (except setting the strategy to 'roundrobin', and fetcher threads to '2') or the issue may have been resolved already by some other patch. In any case, the issue based on what I read in the JIRA stems from multiple threads that each consumer may have and how they threads of each consumer are assigned first before assigning partitions to other consumer threads. Since the new consumer is single threaded there is no such problem in its round robin strategy. It simply considers consumers one by one for each partition assignment, and when one consumer is assigned a partition, the next assignment starts with considering the next consumer in the list (and not the same consumer that was just assigned). This removes the possibility of the issue reported in KAFKA-2019 surfacing in the new consumer. In the sticky strategy we do not have this issue either, since every time an assignment is about to happen we start with the consumer with least number of assignments. So we will not have a scenario where a consumer is repeated assigned partitions as in KAFKA-2019 (unless that consumer is lagging behind other consumers on the number of partitions assigned). The major selling point seems to be the second point. This is definitely nice to have, but would you expect a lot of value in practice since consumer groups are usually assumed to be stable? It might help to describe some specific use cases to help motivate the proposal. One of the downsides is that it requires users to restructure their code to get any benefit from it. In particular, they need to move partition cleanup out of the onPartitionsRevoked() callback and into onPartitionsAssigned(). This is a little awkward and will probably make explaining the consumer more difficult. It's probably worth including a discussion of this point in the proposal with an example. Even though consumer groups are usually stable, it might be the case that consumers do not initially join the group at the same time. The sticky strategy in that situation lets those who joined earlier stick to their partitions to some extent (assuming fairness take precedence over stickiness). In terms of specific use cases, Andrew touched on examples of
Re: [DISCUSS] KIP-54 Sticky Partition Assignment Strategy
Just adding some related reference here: Henry Cai is contributing some advanced feature in Kafka Streams regarding static assignment: https://github.com/apache/kafka/pull/1543 The main motivation is that when you do rolling bounce for upgrading your Kafka Streams code, for example, you would prefer to not move assigned partitions of the current bouncing instance to others, and today it is worked around by increasing the session.timeout; but what is more tricky is that when the bouncing instance comes back, it will still trigger a rebalance. The idea is that as long as we can encode the previous iteration's assignment map, and we can check that the list of partitions / members does not change regarding to their previous assigned partitions, we keep the assigned as is. Guozhang On Thu, Jun 23, 2016 at 10:24 AM, Andrew Coates wrote: > Hey Jason, > > Good to know on the round robin assignment. I'll look into that. > > The issue I have with the current rebalance listener is that it's not > intuitive and unnecessarily exposes the inner workings of rebalance logic. > When the onPartitionsRevoked method is called it's not really saying the > partitions were revoked. It's really saying a rebalance is happening and > you need to deal with any in-flight partitions & commit offsets. So maybe > the method name is wrong! Maybe it should be 'onRebalance' or > 'commitOffsets'..? Then the interface could also have an > onPartitionsRevoked method that is only called when partitions have been > revoked and given to someone else to handle, rather than just kind of > paused while we rebalance... maybe the new method could be > onPausePartitions? > > Andy > > On Thu, 23 Jun 2016, 18:06 Jason Gustafson, wrote: > > > Hey Andy, > > > > Thanks for jumping in. A couple comments: > > > > In addition, I think it is important that during a rebalance consumers do > > > not first have all partitions revoked, only to have a very similar, (or > > the > > > same!), set reassigned. This is less than initiative and complicates > > client > > > code unnecessarily. Instead, the `ConsumerPartitionListener` should > only > > be > > > called for true changes in assignment I.e. any new partitions assigned > > and > > > any existing ones revoked, when comparing the new assignment to the > > > previous one. > > > > > > The problem is that the revocation callback is called before you know > what > > the assignment for the next generation will be. This is necessary for the > > consumer to be able to commit offsets for its assigned partitions. Once > the > > consumer has a new assignment, it is no longer safe to commit offsets > from > > the previous generation. Unless sticky assignment can give us some > > guarantee on which partitions will remain after the rebalance, all of > them > > must be included in the revocation callback. > > > > > > > There is one last scenario I'd like to highlight that I think the KIP > > > should describe: say you have a group consuming from two topics, each > > topic > > > with two partitions. As of 0.9.0.1 the maximum number of consumers you > > can > > > have is 2, not 4. With 2 consumers each will get one partition from > each > > > topic. A third consumer with not have any partitions assigned. This > > should > > > be fixed by the 'fair' part of the strategy, but it would be good to > see > > > this covered explicitly in the KIP. > > > > > > This would be true for range assignment, but with 4 partitions total, > > round-robin assignment would give one partition to each of the 4 > consumers > > (assuming subscriptions match). > > > > Thanks, > > Jason > > > > > > On Thu, Jun 23, 2016 at 1:42 AM, Andrew Coates < > big.andy.coa...@gmail.com> > > wrote: > > > > > Hi all, > > > > > > I think sticky assignment is immensely important / useful in many > > > situations. Apps that use Kafka are many and varied. Any app that > stores > > > any state, either in the form of data from incoming messages, cached > > > results from previous out-of-process calls or expensive operations, > (and > > > let's face it, that's most!), can see a big negative impact from > > partition > > > movement. > > > > > > The main issue partition movement brings is that it makes building > > elastic > > > services very hard. Consider: you've got an app consuming from Kafka > that > > > locally caches data to improve performance. You want the app to auto > > scale > > > as the throughout to the topic(s) increases. Currently, when one or > > more > > > new instance are added and the group rebalances, all existing instances > > > have all partitions revoked, and then a new, potentially quite > different, > > > set assigned. An intuitive pattern is to evict partition state, I.e. > the > > > cached data, when a partition is revoked. So in this case all apps > flush > > > their entire cache causing throughput to drop massively, right when you > > > want to increase it! > > > > > > Even if the app is not flushing partition state when partitions are
Re: [DISCUSS] KIP-54 Sticky Partition Assignment Strategy
Hey Jason, Good to know on the round robin assignment. I'll look into that. The issue I have with the current rebalance listener is that it's not intuitive and unnecessarily exposes the inner workings of rebalance logic. When the onPartitionsRevoked method is called it's not really saying the partitions were revoked. It's really saying a rebalance is happening and you need to deal with any in-flight partitions & commit offsets. So maybe the method name is wrong! Maybe it should be 'onRebalance' or 'commitOffsets'..? Then the interface could also have an onPartitionsRevoked method that is only called when partitions have been revoked and given to someone else to handle, rather than just kind of paused while we rebalance... maybe the new method could be onPausePartitions? Andy On Thu, 23 Jun 2016, 18:06 Jason Gustafson, wrote: > Hey Andy, > > Thanks for jumping in. A couple comments: > > In addition, I think it is important that during a rebalance consumers do > > not first have all partitions revoked, only to have a very similar, (or > the > > same!), set reassigned. This is less than initiative and complicates > client > > code unnecessarily. Instead, the `ConsumerPartitionListener` should only > be > > called for true changes in assignment I.e. any new partitions assigned > and > > any existing ones revoked, when comparing the new assignment to the > > previous one. > > > The problem is that the revocation callback is called before you know what > the assignment for the next generation will be. This is necessary for the > consumer to be able to commit offsets for its assigned partitions. Once the > consumer has a new assignment, it is no longer safe to commit offsets from > the previous generation. Unless sticky assignment can give us some > guarantee on which partitions will remain after the rebalance, all of them > must be included in the revocation callback. > > > > There is one last scenario I'd like to highlight that I think the KIP > > should describe: say you have a group consuming from two topics, each > topic > > with two partitions. As of 0.9.0.1 the maximum number of consumers you > can > > have is 2, not 4. With 2 consumers each will get one partition from each > > topic. A third consumer with not have any partitions assigned. This > should > > be fixed by the 'fair' part of the strategy, but it would be good to see > > this covered explicitly in the KIP. > > > This would be true for range assignment, but with 4 partitions total, > round-robin assignment would give one partition to each of the 4 consumers > (assuming subscriptions match). > > Thanks, > Jason > > > On Thu, Jun 23, 2016 at 1:42 AM, Andrew Coates > wrote: > > > Hi all, > > > > I think sticky assignment is immensely important / useful in many > > situations. Apps that use Kafka are many and varied. Any app that stores > > any state, either in the form of data from incoming messages, cached > > results from previous out-of-process calls or expensive operations, (and > > let's face it, that's most!), can see a big negative impact from > partition > > movement. > > > > The main issue partition movement brings is that it makes building > elastic > > services very hard. Consider: you've got an app consuming from Kafka that > > locally caches data to improve performance. You want the app to auto > scale > > as the throughout to the topic(s) increases. Currently, when one or > more > > new instance are added and the group rebalances, all existing instances > > have all partitions revoked, and then a new, potentially quite different, > > set assigned. An intuitive pattern is to evict partition state, I.e. the > > cached data, when a partition is revoked. So in this case all apps flush > > their entire cache causing throughput to drop massively, right when you > > want to increase it! > > > > Even if the app is not flushing partition state when partitions are > > revoked, the lack of a 'sticky' strategy means that a proportion of the > > cached state is now useless, and instances have partitions assigned for > > which they have no cached state, again negatively impacting throughout. > > > > With a 'sticky' strategy throughput can be maintained and indeed > increased, > > as intended. > > > > The same is also true in the presence of failure. An instance failing, > > (maybe due to high load), can invalidate the caching of existing > instances, > > negatively impacting throughout of the remaining instances, (possibly at > a > > time the system needs throughput the most!) > > > > My question would be 'why move partitions if you don't have to?'. I will > > certainly be setting the 'sticky' assignment strategy as the default once > > it's released, and I have a feeling it will become the default in the > > communitie's 'best-practice' guides. > > > > In addition, I think it is important that during a rebalance consumers do > > not first have all partitions revoked, only to have a very similar, (or > the > > same!), set reassigned. This
Re: [DISCUSS] KIP-54 Sticky Partition Assignment Strategy
Hi Onur, Your understanding is correct. If a consumer dies and later comes back, with the current proposal, there is no guarantee that it would reclaim its previous assignment. Regards, --Vahid From: Onur Karaman To: dev@kafka.apache.org Date: 06/23/2016 01:03 AM Subject:Re: [DISCUSS] KIP-54 Sticky Partition Assignment Strategy >From what I understood, it seems that stickiness is preserved only for the remaining live consumers. Say a consumer owns some partitions and then dies. Those partitions will get redistributed to the rest of the group. Now if the consumer comes back up, based on the algorithm described with the concept of "reassignable partitions", then the consumer may get different partitions than what it had before. Is my understanding right? Put another way: once coming back up, can the consumer load its UserData with the assignment it had before dying? On Wed, Jun 22, 2016 at 4:41 PM, Jason Gustafson wrote: > Hey Vahid, > > Thanks for the updates. I think the lack of comments on this KIP suggests > that the motivation might need a little work. Here are the two main > benefits of this assignor as I see them: > > 1. It can give a more balanced assignment when subscriptions do not match > in a group (this is the same problem solved by KIP-49). > 2. It potentially allows applications to save the need to cleanup partition > state when rebalancing since partitions are more likely to stay assigned to > the same consumer. > > Does that seem right to you? > > I think it's unclear how serious the first problem is. Providing better > balance when subscriptions differ is nice, but are rolling updates the only > scenario where this is encountered? Or are there more general use cases > where differing subscriptions could persist for a longer duration? I'm also > wondering if this assignor addresses the problem found in KAFKA-2019. It > would be useful to confirm whether this problem still exists with the new > consumer's round robin strategy and how (whether?) it is addressed by this > assignor. > > The major selling point seems to be the second point. This is definitely > nice to have, but would you expect a lot of value in practice since > consumer groups are usually assumed to be stable? It might help to describe > some specific use cases to help motivate the proposal. One of the downsides > is that it requires users to restructure their code to get any benefit from > it. In particular, they need to move partition cleanup out of the > onPartitionsRevoked() callback and into onPartitionsAssigned(). This is a > little awkward and will probably make explaining the consumer more > difficult. It's probably worth including a discussion of this point in the > proposal with an example. > > Thanks, > Jason > > > > On Tue, Jun 7, 2016 at 4:05 PM, Vahid S Hashemian < > vahidhashem...@us.ibm.com > > wrote: > > > Hi Jason, > > > > I updated the KIP and added some details about the user data, the > > assignment algorithm, and the alternative strategies to consider. > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-54+-+Sticky+Partition+Assignment+Strategy > > > > Please let me know if I missed to add something. Thank you. > > > > Regards, > > --Vahid > > > > > > >
Re: [DISCUSS] KIP-54 Sticky Partition Assignment Strategy
Hey Andy, Thanks for jumping in. A couple comments: In addition, I think it is important that during a rebalance consumers do > not first have all partitions revoked, only to have a very similar, (or the > same!), set reassigned. This is less than initiative and complicates client > code unnecessarily. Instead, the `ConsumerPartitionListener` should only be > called for true changes in assignment I.e. any new partitions assigned and > any existing ones revoked, when comparing the new assignment to the > previous one. The problem is that the revocation callback is called before you know what the assignment for the next generation will be. This is necessary for the consumer to be able to commit offsets for its assigned partitions. Once the consumer has a new assignment, it is no longer safe to commit offsets from the previous generation. Unless sticky assignment can give us some guarantee on which partitions will remain after the rebalance, all of them must be included in the revocation callback. > There is one last scenario I'd like to highlight that I think the KIP > should describe: say you have a group consuming from two topics, each topic > with two partitions. As of 0.9.0.1 the maximum number of consumers you can > have is 2, not 4. With 2 consumers each will get one partition from each > topic. A third consumer with not have any partitions assigned. This should > be fixed by the 'fair' part of the strategy, but it would be good to see > this covered explicitly in the KIP. This would be true for range assignment, but with 4 partitions total, round-robin assignment would give one partition to each of the 4 consumers (assuming subscriptions match). Thanks, Jason On Thu, Jun 23, 2016 at 1:42 AM, Andrew Coates wrote: > Hi all, > > I think sticky assignment is immensely important / useful in many > situations. Apps that use Kafka are many and varied. Any app that stores > any state, either in the form of data from incoming messages, cached > results from previous out-of-process calls or expensive operations, (and > let's face it, that's most!), can see a big negative impact from partition > movement. > > The main issue partition movement brings is that it makes building elastic > services very hard. Consider: you've got an app consuming from Kafka that > locally caches data to improve performance. You want the app to auto scale > as the throughout to the topic(s) increases. Currently, when one or more > new instance are added and the group rebalances, all existing instances > have all partitions revoked, and then a new, potentially quite different, > set assigned. An intuitive pattern is to evict partition state, I.e. the > cached data, when a partition is revoked. So in this case all apps flush > their entire cache causing throughput to drop massively, right when you > want to increase it! > > Even if the app is not flushing partition state when partitions are > revoked, the lack of a 'sticky' strategy means that a proportion of the > cached state is now useless, and instances have partitions assigned for > which they have no cached state, again negatively impacting throughout. > > With a 'sticky' strategy throughput can be maintained and indeed increased, > as intended. > > The same is also true in the presence of failure. An instance failing, > (maybe due to high load), can invalidate the caching of existing instances, > negatively impacting throughout of the remaining instances, (possibly at a > time the system needs throughput the most!) > > My question would be 'why move partitions if you don't have to?'. I will > certainly be setting the 'sticky' assignment strategy as the default once > it's released, and I have a feeling it will become the default in the > communitie's 'best-practice' guides. > > In addition, I think it is important that during a rebalance consumers do > not first have all partitions revoked, only to have a very similar, (or the > same!), set reassigned. This is less than initiative and complicates client > code unnecessarily. Instead, the `ConsumerPartitionListener` should only be > called for true changes in assignment I.e. any new partitions assigned and > any existing ones revoked, when comparing the new assignment to the > previous one. > > I think the change to how the client listener is called should be part of > this work. > > There is one last scenario I'd like to highlight that I think the KIP > should describe: say you have a group consuming from two topics, each topic > with two partitions. As of 0.9.0.1 the maximum number of consumers you can > have is 2, not 4. With 2 consumers each will get one partition from each > topic. A third consumer with not have any partitions assigned. This should > be fixed by the 'fair' part of the strategy, but it would be good to see > this covered explicitly in the KIP. > > Thanks, > > > Andy > > > > > > > > > On Thu, 23 Jun 2016, 00:41 Jason Gustafson, wrote: > > > Hey Vahid, > > > > Thanks for the updates. I think the
Re: [DISCUSS] KIP-54 Sticky Partition Assignment Strategy
Hi all, I think sticky assignment is immensely important / useful in many situations. Apps that use Kafka are many and varied. Any app that stores any state, either in the form of data from incoming messages, cached results from previous out-of-process calls or expensive operations, (and let's face it, that's most!), can see a big negative impact from partition movement. The main issue partition movement brings is that it makes building elastic services very hard. Consider: you've got an app consuming from Kafka that locally caches data to improve performance. You want the app to auto scale as the throughout to the topic(s) increases. Currently, when one or more new instance are added and the group rebalances, all existing instances have all partitions revoked, and then a new, potentially quite different, set assigned. An intuitive pattern is to evict partition state, I.e. the cached data, when a partition is revoked. So in this case all apps flush their entire cache causing throughput to drop massively, right when you want to increase it! Even if the app is not flushing partition state when partitions are revoked, the lack of a 'sticky' strategy means that a proportion of the cached state is now useless, and instances have partitions assigned for which they have no cached state, again negatively impacting throughout. With a 'sticky' strategy throughput can be maintained and indeed increased, as intended. The same is also true in the presence of failure. An instance failing, (maybe due to high load), can invalidate the caching of existing instances, negatively impacting throughout of the remaining instances, (possibly at a time the system needs throughput the most!) My question would be 'why move partitions if you don't have to?'. I will certainly be setting the 'sticky' assignment strategy as the default once it's released, and I have a feeling it will become the default in the communitie's 'best-practice' guides. In addition, I think it is important that during a rebalance consumers do not first have all partitions revoked, only to have a very similar, (or the same!), set reassigned. This is less than initiative and complicates client code unnecessarily. Instead, the `ConsumerPartitionListener` should only be called for true changes in assignment I.e. any new partitions assigned and any existing ones revoked, when comparing the new assignment to the previous one. I think the change to how the client listener is called should be part of this work. There is one last scenario I'd like to highlight that I think the KIP should describe: say you have a group consuming from two topics, each topic with two partitions. As of 0.9.0.1 the maximum number of consumers you can have is 2, not 4. With 2 consumers each will get one partition from each topic. A third consumer with not have any partitions assigned. This should be fixed by the 'fair' part of the strategy, but it would be good to see this covered explicitly in the KIP. Thanks, Andy On Thu, 23 Jun 2016, 00:41 Jason Gustafson, wrote: > Hey Vahid, > > Thanks for the updates. I think the lack of comments on this KIP suggests > that the motivation might need a little work. Here are the two main > benefits of this assignor as I see them: > > 1. It can give a more balanced assignment when subscriptions do not match > in a group (this is the same problem solved by KIP-49). > 2. It potentially allows applications to save the need to cleanup partition > state when rebalancing since partitions are more likely to stay assigned to > the same consumer. > > Does that seem right to you? > > I think it's unclear how serious the first problem is. Providing better > balance when subscriptions differ is nice, but are rolling updates the only > scenario where this is encountered? Or are there more general use cases > where differing subscriptions could persist for a longer duration? I'm also > wondering if this assignor addresses the problem found in KAFKA-2019. It > would be useful to confirm whether this problem still exists with the new > consumer's round robin strategy and how (whether?) it is addressed by this > assignor. > > The major selling point seems to be the second point. This is definitely > nice to have, but would you expect a lot of value in practice since > consumer groups are usually assumed to be stable? It might help to describe > some specific use cases to help motivate the proposal. One of the downsides > is that it requires users to restructure their code to get any benefit from > it. In particular, they need to move partition cleanup out of the > onPartitionsRevoked() callback and into onPartitionsAssigned(). This is a > little awkward and will probably make explaining the consumer more > difficult. It's probably worth including a discussion of this point in the > proposal with an example. > > Thanks, > Jason > > > > On Tue, Jun 7, 2016 at 4:05 PM, Vahid S Hashemian < > vahidhashem...@us.ibm.com > > wrote: > > > Hi Jason, > > > > I
Re: [DISCUSS] KIP-54 Sticky Partition Assignment Strategy
>From what I understood, it seems that stickiness is preserved only for the remaining live consumers. Say a consumer owns some partitions and then dies. Those partitions will get redistributed to the rest of the group. Now if the consumer comes back up, based on the algorithm described with the concept of "reassignable partitions", then the consumer may get different partitions than what it had before. Is my understanding right? Put another way: once coming back up, can the consumer load its UserData with the assignment it had before dying? On Wed, Jun 22, 2016 at 4:41 PM, Jason Gustafson wrote: > Hey Vahid, > > Thanks for the updates. I think the lack of comments on this KIP suggests > that the motivation might need a little work. Here are the two main > benefits of this assignor as I see them: > > 1. It can give a more balanced assignment when subscriptions do not match > in a group (this is the same problem solved by KIP-49). > 2. It potentially allows applications to save the need to cleanup partition > state when rebalancing since partitions are more likely to stay assigned to > the same consumer. > > Does that seem right to you? > > I think it's unclear how serious the first problem is. Providing better > balance when subscriptions differ is nice, but are rolling updates the only > scenario where this is encountered? Or are there more general use cases > where differing subscriptions could persist for a longer duration? I'm also > wondering if this assignor addresses the problem found in KAFKA-2019. It > would be useful to confirm whether this problem still exists with the new > consumer's round robin strategy and how (whether?) it is addressed by this > assignor. > > The major selling point seems to be the second point. This is definitely > nice to have, but would you expect a lot of value in practice since > consumer groups are usually assumed to be stable? It might help to describe > some specific use cases to help motivate the proposal. One of the downsides > is that it requires users to restructure their code to get any benefit from > it. In particular, they need to move partition cleanup out of the > onPartitionsRevoked() callback and into onPartitionsAssigned(). This is a > little awkward and will probably make explaining the consumer more > difficult. It's probably worth including a discussion of this point in the > proposal with an example. > > Thanks, > Jason > > > > On Tue, Jun 7, 2016 at 4:05 PM, Vahid S Hashemian < > vahidhashem...@us.ibm.com > > wrote: > > > Hi Jason, > > > > I updated the KIP and added some details about the user data, the > > assignment algorithm, and the alternative strategies to consider. > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-54+-+Sticky+Partition+Assignment+Strategy > > > > Please let me know if I missed to add something. Thank you. > > > > Regards, > > --Vahid > > > > > > >
Re: [DISCUSS] KIP-54 Sticky Partition Assignment Strategy
Hey Vahid, Thanks for the updates. I think the lack of comments on this KIP suggests that the motivation might need a little work. Here are the two main benefits of this assignor as I see them: 1. It can give a more balanced assignment when subscriptions do not match in a group (this is the same problem solved by KIP-49). 2. It potentially allows applications to save the need to cleanup partition state when rebalancing since partitions are more likely to stay assigned to the same consumer. Does that seem right to you? I think it's unclear how serious the first problem is. Providing better balance when subscriptions differ is nice, but are rolling updates the only scenario where this is encountered? Or are there more general use cases where differing subscriptions could persist for a longer duration? I'm also wondering if this assignor addresses the problem found in KAFKA-2019. It would be useful to confirm whether this problem still exists with the new consumer's round robin strategy and how (whether?) it is addressed by this assignor. The major selling point seems to be the second point. This is definitely nice to have, but would you expect a lot of value in practice since consumer groups are usually assumed to be stable? It might help to describe some specific use cases to help motivate the proposal. One of the downsides is that it requires users to restructure their code to get any benefit from it. In particular, they need to move partition cleanup out of the onPartitionsRevoked() callback and into onPartitionsAssigned(). This is a little awkward and will probably make explaining the consumer more difficult. It's probably worth including a discussion of this point in the proposal with an example. Thanks, Jason On Tue, Jun 7, 2016 at 4:05 PM, Vahid S Hashemian wrote: > Hi Jason, > > I updated the KIP and added some details about the user data, the > assignment algorithm, and the alternative strategies to consider. > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-54+-+Sticky+Partition+Assignment+Strategy > > Please let me know if I missed to add something. Thank you. > > Regards, > --Vahid > > >
Re: [DISCUSS] KIP-54 Sticky Partition Assignment Strategy
Hi Jason, I updated the KIP and added some details about the user data, the assignment algorithm, and the alternative strategies to consider. https://cwiki.apache.org/confluence/display/KAFKA/KIP-54+-+Sticky+Partition+Assignment+Strategy Please let me know if I missed to add something. Thank you. Regards, --Vahid
Re: [DISCUSS] KIP-54 Sticky Partition Assignment Strategy
Hi Jason, Sorry about my misunderstanding, and thanks for sending the reference. The grammar you sent is correct; that is how the current assignments are preserved in the current implementation. I understand your point about limiting the policies provided with the Kafka release, and the value of providing sticky assignment out of the box. I'm okay with what the community decides in terms of which of these options should go into Kafka. I'll try to document these alternatives in the KIP. Regards, --Vahid From: Jason Gustafson To: dev@kafka.apache.org Date: 06/06/2016 08:14 PM Subject: Re: [DISCUSS] KIP-54 Sticky Partition Assignment Strategy Hi Vahid, The only thing I added was the specification of the UserData field. The rest comes from here: https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol . See the section on the JoinGroup request. Generally speaking, I think having fewer assignment strategies included with Kafka is probably better. One of the advantages of the client-side assignment approach is that there's no actual need to bundle them into the release. Applications can use them by depending on a separate library. That said, sticky assignment seems like a generally good idea and a common need, so it may be helpful for a lot of users to make it easily available in the release. If it also addresses the issues raised in KIP-49, then so much the better. As for whether we should include both, there I'm not too sure. Most users probably wouldn't have a strong reason to choose the "fair" assignment over the "sticky" assignment since they both seem to have the same properties in terms of balancing the group's partitions. The overhead is a concern for large groups with many topic subscriptions though, so if people think that the "fair" approach brings a lot of benefit over round-robin, then it may be worth including also. -Jason On Mon, Jun 6, 2016 at 5:17 PM, Vahid S Hashemian wrote: > Hi Jason, > > Thanks for reviewing the KIP. > I will add the details you requested, but to summarize: > > Regarding the structure of the user data: > > Right now the user data will have the current assignments only which is a > mapping of consumers to their assigned topic partitions. Is this mapping > what you're also suggesting with CurrentAssignment field? > I see how adding a version (as sticky assignor version) will be useful. > Also how having a protocol name would be useful, perhaps for validation. > But could you clarify the "Subscription" field and how you think it'll > come into play? > > > Regarding the algorithm: > > There could be similarities between how this KIP is implemented and how > KIP-49 is handling the fairness. But since we had to take stickiness into > consideration we started fresh and did not adopt from KIP-49. > The Sticky assignor implementation is comprehensive and guarantees the > fairest possible assignment with highest stickiness. I even have a unit > test that randomly generates an assignment problem and verifies that a > fair and sticky assignment is calculated. > KIP-54 gives priority to fairness over stickiness (which makes the > implementation more complex). We could have another strategy that gives > priority to stickiness over fairness (which supposedly will have a better > performance). > The main distinction between KIP-54 and KIP-49 is that KIP-49 calculates > the assignment without considering the previous assignments (fairness > only); whereas for KIP-54 previous assignments play a big role (fairness > and stickiness). > I believe if there is a situation where the stickiness requirements do not > exist it would make sense to use a fair-only assignment without the > overhead of sticky assignment, as you mentioned. > So, I could see three different strategies that could enrich assignment > policy options. > It would be great to have some feedback from the community about what is > the best way to move forward with these two KIPs. > > In the meantime, I'll add some more details in the KIP about the approach > for calculating assignments. > > Thanks again. > > Regards, > --Vahid > > > > > From: Jason Gustafson > To: dev@kafka.apache.org > Date: 06/06/2016 01:26 PM > Subject:Re: [DISCUSS] KIP-54 Sticky Partition Assignment Strategy > > > > Hi Vahid, > > Can you add some detail to the KIP on the structure of the user data? I'm > guessing it would be something like this: > > ProtocolName => "sticky" > > ProtocolMetadata => Version Subscription UserData > Version => int16 > Subscription => [Topic] > Topic => string > UserData => CurrentAssignment > Curren
Re: [DISCUSS] KIP-54 Sticky Partition Assignment Strategy
Hi Vahid, The only thing I added was the specification of the UserData field. The rest comes from here: https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol. See the section on the JoinGroup request. Generally speaking, I think having fewer assignment strategies included with Kafka is probably better. One of the advantages of the client-side assignment approach is that there's no actual need to bundle them into the release. Applications can use them by depending on a separate library. That said, sticky assignment seems like a generally good idea and a common need, so it may be helpful for a lot of users to make it easily available in the release. If it also addresses the issues raised in KIP-49, then so much the better. As for whether we should include both, there I'm not too sure. Most users probably wouldn't have a strong reason to choose the "fair" assignment over the "sticky" assignment since they both seem to have the same properties in terms of balancing the group's partitions. The overhead is a concern for large groups with many topic subscriptions though, so if people think that the "fair" approach brings a lot of benefit over round-robin, then it may be worth including also. -Jason On Mon, Jun 6, 2016 at 5:17 PM, Vahid S Hashemian wrote: > Hi Jason, > > Thanks for reviewing the KIP. > I will add the details you requested, but to summarize: > > Regarding the structure of the user data: > > Right now the user data will have the current assignments only which is a > mapping of consumers to their assigned topic partitions. Is this mapping > what you're also suggesting with CurrentAssignment field? > I see how adding a version (as sticky assignor version) will be useful. > Also how having a protocol name would be useful, perhaps for validation. > But could you clarify the "Subscription" field and how you think it'll > come into play? > > > Regarding the algorithm: > > There could be similarities between how this KIP is implemented and how > KIP-49 is handling the fairness. But since we had to take stickiness into > consideration we started fresh and did not adopt from KIP-49. > The Sticky assignor implementation is comprehensive and guarantees the > fairest possible assignment with highest stickiness. I even have a unit > test that randomly generates an assignment problem and verifies that a > fair and sticky assignment is calculated. > KIP-54 gives priority to fairness over stickiness (which makes the > implementation more complex). We could have another strategy that gives > priority to stickiness over fairness (which supposedly will have a better > performance). > The main distinction between KIP-54 and KIP-49 is that KIP-49 calculates > the assignment without considering the previous assignments (fairness > only); whereas for KIP-54 previous assignments play a big role (fairness > and stickiness). > I believe if there is a situation where the stickiness requirements do not > exist it would make sense to use a fair-only assignment without the > overhead of sticky assignment, as you mentioned. > So, I could see three different strategies that could enrich assignment > policy options. > It would be great to have some feedback from the community about what is > the best way to move forward with these two KIPs. > > In the meantime, I'll add some more details in the KIP about the approach > for calculating assignments. > > Thanks again. > > Regards, > --Vahid > > > > > From: Jason Gustafson > To: dev@kafka.apache.org > Date: 06/06/2016 01:26 PM > Subject:Re: [DISCUSS] KIP-54 Sticky Partition Assignment Strategy > > > > Hi Vahid, > > Can you add some detail to the KIP on the structure of the user data? I'm > guessing it would be something like this: > > ProtocolName => "sticky" > > ProtocolMetadata => Version Subscription UserData > Version => int16 > Subscription => [Topic] > Topic => string > UserData => CurrentAssignment > CurrentAssignment => [Topic [Partition]] > Topic => string > Partiton => int32 > > It would also be helpful to include a little more detail on the algorithm. > From what I can tell, it looks like you're adopting some of the strategies > from KIP-49 to handle differing subscriptions better. If so, then I wonder > if it makes sense to combine the two KIPs? Or do you think there would be > an advantage to having the "fair" assignment strategy without the overhead > of the sticky assignor? > > Thanks, > Jason > > > > On Fri, Jun 3, 2016 at 11:33 AM, Guozhang Wang wrote: > > > Sorry for being late on th
Re: [DISCUSS] KIP-54 Sticky Partition Assignment Strategy
Hi Jason, Thanks for reviewing the KIP. I will add the details you requested, but to summarize: Regarding the structure of the user data: Right now the user data will have the current assignments only which is a mapping of consumers to their assigned topic partitions. Is this mapping what you're also suggesting with CurrentAssignment field? I see how adding a version (as sticky assignor version) will be useful. Also how having a protocol name would be useful, perhaps for validation. But could you clarify the "Subscription" field and how you think it'll come into play? Regarding the algorithm: There could be similarities between how this KIP is implemented and how KIP-49 is handling the fairness. But since we had to take stickiness into consideration we started fresh and did not adopt from KIP-49. The Sticky assignor implementation is comprehensive and guarantees the fairest possible assignment with highest stickiness. I even have a unit test that randomly generates an assignment problem and verifies that a fair and sticky assignment is calculated. KIP-54 gives priority to fairness over stickiness (which makes the implementation more complex). We could have another strategy that gives priority to stickiness over fairness (which supposedly will have a better performance). The main distinction between KIP-54 and KIP-49 is that KIP-49 calculates the assignment without considering the previous assignments (fairness only); whereas for KIP-54 previous assignments play a big role (fairness and stickiness). I believe if there is a situation where the stickiness requirements do not exist it would make sense to use a fair-only assignment without the overhead of sticky assignment, as you mentioned. So, I could see three different strategies that could enrich assignment policy options. It would be great to have some feedback from the community about what is the best way to move forward with these two KIPs. In the meantime, I'll add some more details in the KIP about the approach for calculating assignments. Thanks again. Regards, --Vahid From: Jason Gustafson To: dev@kafka.apache.org Date: 06/06/2016 01:26 PM Subject: Re: [DISCUSS] KIP-54 Sticky Partition Assignment Strategy Hi Vahid, Can you add some detail to the KIP on the structure of the user data? I'm guessing it would be something like this: ProtocolName => "sticky" ProtocolMetadata => Version Subscription UserData Version => int16 Subscription => [Topic] Topic => string UserData => CurrentAssignment CurrentAssignment => [Topic [Partition]] Topic => string Partiton => int32 It would also be helpful to include a little more detail on the algorithm. >From what I can tell, it looks like you're adopting some of the strategies from KIP-49 to handle differing subscriptions better. If so, then I wonder if it makes sense to combine the two KIPs? Or do you think there would be an advantage to having the "fair" assignment strategy without the overhead of the sticky assignor? Thanks, Jason On Fri, Jun 3, 2016 at 11:33 AM, Guozhang Wang wrote: > Sorry for being late on this thread. > > The assign() function is auto-triggered during the rebalance by one of the > consumers when it receives all subscription information collected from the > server-side coordinator. > > More details can be found here: > > https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Client-side+Assignment+Proposal#KafkaClient-sideAssignmentProposal-ConsumerEmbeddedProtocol > > As for Kafka Streams, they way it did "stickiness" is by 1) let all > consumers put their current assigned topic-partitions and server ids into > the "metadata" field of the JoinGroupRequest, 2) when the selected consumer > triggers assign() along with all the subscriptions as well as their > metadata, it can parse the metadata to learn about the existing assignment > map; and hence when making the new assignment it will try to assign > partitions to its current owners "with best effort". > > > Hope this helps. > > > Guozhang > > > On Thu, May 26, 2016 at 4:56 PM, Vahid S Hashemian < > vahidhashem...@us.ibm.com> wrote: > > > Hi Guozhang, > > > > I was looking at the implementation of StreamsPartitionAssignor through > > its unit tests and expected to find some tests that > > - verify stickiness by making at least two calls to the assign() method > > (so we check the second assign() call output preserves the assignments > > coming from the first assign() call output); or > > - start off by a preset assignment, call assign() after some subscription > > change, and verify the previous assignment are preserved. > > But none of the methods seem to do these. Did I overlook
Re: [DISCUSS] KIP-54 Sticky Partition Assignment Strategy
Hi Vahid, Can you add some detail to the KIP on the structure of the user data? I'm guessing it would be something like this: ProtocolName => "sticky" ProtocolMetadata => Version Subscription UserData Version => int16 Subscription => [Topic] Topic => string UserData => CurrentAssignment CurrentAssignment => [Topic [Partition]] Topic => string Partiton => int32 It would also be helpful to include a little more detail on the algorithm. >From what I can tell, it looks like you're adopting some of the strategies from KIP-49 to handle differing subscriptions better. If so, then I wonder if it makes sense to combine the two KIPs? Or do you think there would be an advantage to having the "fair" assignment strategy without the overhead of the sticky assignor? Thanks, Jason On Fri, Jun 3, 2016 at 11:33 AM, Guozhang Wang wrote: > Sorry for being late on this thread. > > The assign() function is auto-triggered during the rebalance by one of the > consumers when it receives all subscription information collected from the > server-side coordinator. > > More details can be found here: > > https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Client-side+Assignment+Proposal#KafkaClient-sideAssignmentProposal-ConsumerEmbeddedProtocol > > As for Kafka Streams, they way it did "stickiness" is by 1) let all > consumers put their current assigned topic-partitions and server ids into > the "metadata" field of the JoinGroupRequest, 2) when the selected consumer > triggers assign() along with all the subscriptions as well as their > metadata, it can parse the metadata to learn about the existing assignment > map; and hence when making the new assignment it will try to assign > partitions to its current owners "with best effort". > > > Hope this helps. > > > Guozhang > > > On Thu, May 26, 2016 at 4:56 PM, Vahid S Hashemian < > vahidhashem...@us.ibm.com> wrote: > > > Hi Guozhang, > > > > I was looking at the implementation of StreamsPartitionAssignor through > > its unit tests and expected to find some tests that > > - verify stickiness by making at least two calls to the assign() method > > (so we check the second assign() call output preserves the assignments > > coming from the first assign() call output); or > > - start off by a preset assignment, call assign() after some subscription > > change, and verify the previous assignment are preserved. > > But none of the methods seem to do these. Did I overlook them, or > > stickiness is being tested in some other fashion? > > > > Also, if there is a high-level write-up about how this assignor works > > could you please point me to it? Thanks. > > > > Regards. > > --Vahid > > > > > > > > > > From: Guozhang Wang > > To: "dev@kafka.apache.org" > > Date: 05/02/2016 10:34 AM > > Subject:Re: [DISCUSS] KIP-54 Sticky Partition Assignment Strategy > > > > > > > > Just FYI, the StreamsPartitionAssignor in Kafka Streams are already doing > > some sort of sticky partitioning mechanism. This is done through the > > userData field though; i.e. all group members send their current > "assigned > > partitions" in their join group request, which will be grouped and send > to > > the leader, the leader then does best-effort for sticky-partitioning. > > > > > > Guozhang > > > > On Fri, Apr 29, 2016 at 9:48 PM, Ewen Cheslack-Postava < > e...@confluent.io> > > wrote: > > > > > I think I'm unclear how we leverage the > > > onPartitionsRevoked/onPartitionsAssigned here in any way that's > > different > > > from our normal usage -- certainly you can use them to generate a diff, > > but > > > you still need to commit when partitions are revoked and that has a > > > non-trivial cost. Are we just saying that you might be able to save > some > > > overhead, e.g. closing/reopening some other resources by doing a flush > > but > > > not a close() or something? You still need to flush any output and > > commit > > > offsets before returning from onPartitionsRevoked, right? Otherwise you > > > couldn't guarantee clean handoff of partitions. > > > > > > In terms of the rebalancing, the basic requirements in the KIP seem > > sound. > > > Passing previous assignment data via UserData also seems reasonable > > since > > > it avoids redistributing all assignment data to all members and doesn't > > > rely on the next generatio
Re: [DISCUSS] KIP-54 Sticky Partition Assignment Strategy
Sorry for being late on this thread. The assign() function is auto-triggered during the rebalance by one of the consumers when it receives all subscription information collected from the server-side coordinator. More details can be found here: https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Client-side+Assignment+Proposal#KafkaClient-sideAssignmentProposal-ConsumerEmbeddedProtocol As for Kafka Streams, they way it did "stickiness" is by 1) let all consumers put their current assigned topic-partitions and server ids into the "metadata" field of the JoinGroupRequest, 2) when the selected consumer triggers assign() along with all the subscriptions as well as their metadata, it can parse the metadata to learn about the existing assignment map; and hence when making the new assignment it will try to assign partitions to its current owners "with best effort". Hope this helps. Guozhang On Thu, May 26, 2016 at 4:56 PM, Vahid S Hashemian < vahidhashem...@us.ibm.com> wrote: > Hi Guozhang, > > I was looking at the implementation of StreamsPartitionAssignor through > its unit tests and expected to find some tests that > - verify stickiness by making at least two calls to the assign() method > (so we check the second assign() call output preserves the assignments > coming from the first assign() call output); or > - start off by a preset assignment, call assign() after some subscription > change, and verify the previous assignment are preserved. > But none of the methods seem to do these. Did I overlook them, or > stickiness is being tested in some other fashion? > > Also, if there is a high-level write-up about how this assignor works > could you please point me to it? Thanks. > > Regards. > --Vahid > > > > > From: Guozhang Wang > To: "dev@kafka.apache.org" > Date: 05/02/2016 10:34 AM > Subject:Re: [DISCUSS] KIP-54 Sticky Partition Assignment Strategy > > > > Just FYI, the StreamsPartitionAssignor in Kafka Streams are already doing > some sort of sticky partitioning mechanism. This is done through the > userData field though; i.e. all group members send their current "assigned > partitions" in their join group request, which will be grouped and send to > the leader, the leader then does best-effort for sticky-partitioning. > > > Guozhang > > On Fri, Apr 29, 2016 at 9:48 PM, Ewen Cheslack-Postava > wrote: > > > I think I'm unclear how we leverage the > > onPartitionsRevoked/onPartitionsAssigned here in any way that's > different > > from our normal usage -- certainly you can use them to generate a diff, > but > > you still need to commit when partitions are revoked and that has a > > non-trivial cost. Are we just saying that you might be able to save some > > overhead, e.g. closing/reopening some other resources by doing a flush > but > > not a close() or something? You still need to flush any output and > commit > > offsets before returning from onPartitionsRevoked, right? Otherwise you > > couldn't guarantee clean handoff of partitions. > > > > In terms of the rebalancing, the basic requirements in the KIP seem > sound. > > Passing previous assignment data via UserData also seems reasonable > since > > it avoids redistributing all assignment data to all members and doesn't > > rely on the next generation leader being a member of the current > > generation. Hopefully this shouldn't be surprising since I think I > > discussed this w/ Jason before he updated the relevant wiki pages :) > > > > -Ewen > > > > > > On Mon, Apr 18, 2016 at 9:34 AM, Vahid S Hashemian < > > vahidhashem...@us.ibm.com> wrote: > > > > > HI Jason, > > > > > > Thanks for your feedback. > > > > > > I believe your suggestion on how to take advantage of this assignor is > > > valid. We can leverage onPartitionsRevoked() and > onPartitionsAssigned() > > > callbacks and do a comparison of assigned partitions before and after > the > > > re-balance and do the cleanup only if there is a change (e.g., if some > > > previously assigned partition is not in the assignment). > > > > > > On your second question, a number of tests that I ran shows that the > old > > > assignments are preserved in the current implementation; except for > when > > > the consumer group leader is killed; in which case, a fresh assignment > is > > > performed. This is something that needs to be fixed. I tried to use > your > > > pointers to find out where the best place is to preserve the old > > > assignment in such circumstan
Re: [DISCUSS] KIP-54 Sticky Partition Assignment Strategy
Hi Guozhang, I was looking at the implementation of StreamsPartitionAssignor through its unit tests and expected to find some tests that - verify stickiness by making at least two calls to the assign() method (so we check the second assign() call output preserves the assignments coming from the first assign() call output); or - start off by a preset assignment, call assign() after some subscription change, and verify the previous assignment are preserved. But none of the methods seem to do these. Did I overlook them, or stickiness is being tested in some other fashion? Also, if there is a high-level write-up about how this assignor works could you please point me to it? Thanks. Regards. --Vahid From: Guozhang Wang To: "dev@kafka.apache.org" Date: 05/02/2016 10:34 AM Subject: Re: [DISCUSS] KIP-54 Sticky Partition Assignment Strategy Just FYI, the StreamsPartitionAssignor in Kafka Streams are already doing some sort of sticky partitioning mechanism. This is done through the userData field though; i.e. all group members send their current "assigned partitions" in their join group request, which will be grouped and send to the leader, the leader then does best-effort for sticky-partitioning. Guozhang On Fri, Apr 29, 2016 at 9:48 PM, Ewen Cheslack-Postava wrote: > I think I'm unclear how we leverage the > onPartitionsRevoked/onPartitionsAssigned here in any way that's different > from our normal usage -- certainly you can use them to generate a diff, but > you still need to commit when partitions are revoked and that has a > non-trivial cost. Are we just saying that you might be able to save some > overhead, e.g. closing/reopening some other resources by doing a flush but > not a close() or something? You still need to flush any output and commit > offsets before returning from onPartitionsRevoked, right? Otherwise you > couldn't guarantee clean handoff of partitions. > > In terms of the rebalancing, the basic requirements in the KIP seem sound. > Passing previous assignment data via UserData also seems reasonable since > it avoids redistributing all assignment data to all members and doesn't > rely on the next generation leader being a member of the current > generation. Hopefully this shouldn't be surprising since I think I > discussed this w/ Jason before he updated the relevant wiki pages :) > > -Ewen > > > On Mon, Apr 18, 2016 at 9:34 AM, Vahid S Hashemian < > vahidhashem...@us.ibm.com> wrote: > > > HI Jason, > > > > Thanks for your feedback. > > > > I believe your suggestion on how to take advantage of this assignor is > > valid. We can leverage onPartitionsRevoked() and onPartitionsAssigned() > > callbacks and do a comparison of assigned partitions before and after the > > re-balance and do the cleanup only if there is a change (e.g., if some > > previously assigned partition is not in the assignment). > > > > On your second question, a number of tests that I ran shows that the old > > assignments are preserved in the current implementation; except for when > > the consumer group leader is killed; in which case, a fresh assignment is > > performed. This is something that needs to be fixed. I tried to use your > > pointers to find out where the best place is to preserve the old > > assignment in such circumstances but have not been able to pinpoint it. > If > > you have any suggestion on this please share. Thanks. > > > > Regards, > > Vahid Hashemian > > > > > > > > > > From: Jason Gustafson > > To: dev@kafka.apache.org > > Date: 04/14/2016 11:37 AM > > Subject:Re: [DISCUSS] KIP-54 Sticky Partition Assignment Strategy > > > > > > > > Hi Vahid, > > > > Thanks for the proposal. I think one of the advantages of having sticky > > assignment would be reduce the need to cleanup local partition state > > between rebalances. Do you have any thoughts on how the user would take > > advantage of this assignor in the consumer to do this? Maybe one approach > > is to delay cleanup until you detect a change from the previous > assignment > > in the onPartitionsAssigned() callback? > > > > Also, can you provide some detail on how the sticky assignor works at the > > group protocol level? For example, do you pass old assignments through > the > > "UserData" field in the consumer's JoinGroup? > > > > Thanks, > > Jason > > > > On Thu, Apr 14, 2016 at 11:05 AM, Vahid S Hashemian < > > vahidhashem...@us.ibm.com> wrote: > > > > > Hi all, > > > > > > I have started a new KIP under > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-54+-+Sticky+Partition+Assignment+Strategy > > > > > The corresponding JIRA is at > > > https://issues.apache.org/jira/browse/KAFKA-2273 > > > The corresponding PR is at https://github.com/apache/kafka/pull/1020 > > > > > > Your feedback is much appreciated. > > > > > > Regards, > > > Vahid Hashemian > > > > > > > > > > > > > > > > > > > -- > Thanks, > Ewen > -- -- Guozhang
Re: [DISCUSS] KIP-54 Sticky Partition Assignment Strategy
Hi Guozhang, Thanks for the pointer. I'll try to take a closer look and get a better understanding and see if there is anything that can be leveraged for KIP-54 implementation. Regards, Vahid Hashemian From: Guozhang Wang To: "dev@kafka.apache.org" Date: 05/02/2016 10:34 AM Subject: Re: [DISCUSS] KIP-54 Sticky Partition Assignment Strategy Just FYI, the StreamsPartitionAssignor in Kafka Streams are already doing some sort of sticky partitioning mechanism. This is done through the userData field though; i.e. all group members send their current "assigned partitions" in their join group request, which will be grouped and send to the leader, the leader then does best-effort for sticky-partitioning. Guozhang On Fri, Apr 29, 2016 at 9:48 PM, Ewen Cheslack-Postava wrote: > I think I'm unclear how we leverage the > onPartitionsRevoked/onPartitionsAssigned here in any way that's different > from our normal usage -- certainly you can use them to generate a diff, but > you still need to commit when partitions are revoked and that has a > non-trivial cost. Are we just saying that you might be able to save some > overhead, e.g. closing/reopening some other resources by doing a flush but > not a close() or something? You still need to flush any output and commit > offsets before returning from onPartitionsRevoked, right? Otherwise you > couldn't guarantee clean handoff of partitions. > > In terms of the rebalancing, the basic requirements in the KIP seem sound. > Passing previous assignment data via UserData also seems reasonable since > it avoids redistributing all assignment data to all members and doesn't > rely on the next generation leader being a member of the current > generation. Hopefully this shouldn't be surprising since I think I > discussed this w/ Jason before he updated the relevant wiki pages :) > > -Ewen > > > On Mon, Apr 18, 2016 at 9:34 AM, Vahid S Hashemian < > vahidhashem...@us.ibm.com> wrote: > > > HI Jason, > > > > Thanks for your feedback. > > > > I believe your suggestion on how to take advantage of this assignor is > > valid. We can leverage onPartitionsRevoked() and onPartitionsAssigned() > > callbacks and do a comparison of assigned partitions before and after the > > re-balance and do the cleanup only if there is a change (e.g., if some > > previously assigned partition is not in the assignment). > > > > On your second question, a number of tests that I ran shows that the old > > assignments are preserved in the current implementation; except for when > > the consumer group leader is killed; in which case, a fresh assignment is > > performed. This is something that needs to be fixed. I tried to use your > > pointers to find out where the best place is to preserve the old > > assignment in such circumstances but have not been able to pinpoint it. > If > > you have any suggestion on this please share. Thanks. > > > > Regards, > > Vahid Hashemian > > > > > > > > > > From: Jason Gustafson > > To: dev@kafka.apache.org > > Date: 04/14/2016 11:37 AM > > Subject:Re: [DISCUSS] KIP-54 Sticky Partition Assignment Strategy > > > > > > > > Hi Vahid, > > > > Thanks for the proposal. I think one of the advantages of having sticky > > assignment would be reduce the need to cleanup local partition state > > between rebalances. Do you have any thoughts on how the user would take > > advantage of this assignor in the consumer to do this? Maybe one approach > > is to delay cleanup until you detect a change from the previous > assignment > > in the onPartitionsAssigned() callback? > > > > Also, can you provide some detail on how the sticky assignor works at the > > group protocol level? For example, do you pass old assignments through > the > > "UserData" field in the consumer's JoinGroup? > > > > Thanks, > > Jason > > > > On Thu, Apr 14, 2016 at 11:05 AM, Vahid S Hashemian < > > vahidhashem...@us.ibm.com> wrote: > > > > > Hi all, > > > > > > I have started a new KIP under > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-54+-+Sticky+Partition+Assignment+Strategy > > > > > The corresponding JIRA is at > > > https://issues.apache.org/jira/browse/KAFKA-2273 > > > The corresponding PR is at https://github.com/apache/kafka/pull/1020 > > > > > > Your feedback is much appreciated. > > > > > > Regards, > > > Vahid Hashemian > > > > > > > > > > > > > > > > > > > -- > Thanks, > Ewen > -- -- Guozhang
Re: [DISCUSS] KIP-54 Sticky Partition Assignment Strategy
Hi Ewen, Thank you for reviewing the KIP and providing feedback. I believe the need to commit would still be there, as you mentioned. The main advantage, however, would be when dealing with local state based on partitions assigned, as described in https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Client-side+Assignment+Proposal or in the corresponding JIRA for this KIP. If consumers perform some processing on re-assignment of partitions (i.e. after a rebalance) it would be more efficient for them to stick to their assigned partitions and reduce the overhead of switching to a new set of partitions (you also referred to some use cases). Unfortunately I don't have a specific use case in mind at the moment, but based on documents like above it seems that consumers can benefit from such a strategy. If you or others can think of specific use cases to enrich the KIP please let me know or directly update the KIP. Regards, Vahid Hashemian From: Ewen Cheslack-Postava To: dev@kafka.apache.org Date: 04/29/2016 09:48 PM Subject:Re: [DISCUSS] KIP-54 Sticky Partition Assignment Strategy I think I'm unclear how we leverage the onPartitionsRevoked/onPartitionsAssigned here in any way that's different from our normal usage -- certainly you can use them to generate a diff, but you still need to commit when partitions are revoked and that has a non-trivial cost. Are we just saying that you might be able to save some overhead, e.g. closing/reopening some other resources by doing a flush but not a close() or something? You still need to flush any output and commit offsets before returning from onPartitionsRevoked, right? Otherwise you couldn't guarantee clean handoff of partitions. In terms of the rebalancing, the basic requirements in the KIP seem sound. Passing previous assignment data via UserData also seems reasonable since it avoids redistributing all assignment data to all members and doesn't rely on the next generation leader being a member of the current generation. Hopefully this shouldn't be surprising since I think I discussed this w/ Jason before he updated the relevant wiki pages :) -Ewen On Mon, Apr 18, 2016 at 9:34 AM, Vahid S Hashemian < vahidhashem...@us.ibm.com> wrote: > HI Jason, > > Thanks for your feedback. > > I believe your suggestion on how to take advantage of this assignor is > valid. We can leverage onPartitionsRevoked() and onPartitionsAssigned() > callbacks and do a comparison of assigned partitions before and after the > re-balance and do the cleanup only if there is a change (e.g., if some > previously assigned partition is not in the assignment). > > On your second question, a number of tests that I ran shows that the old > assignments are preserved in the current implementation; except for when > the consumer group leader is killed; in which case, a fresh assignment is > performed. This is something that needs to be fixed. I tried to use your > pointers to find out where the best place is to preserve the old > assignment in such circumstances but have not been able to pinpoint it. If > you have any suggestion on this please share. Thanks. > > Regards, > Vahid Hashemian > > > > > From: Jason Gustafson > To: dev@kafka.apache.org > Date: 04/14/2016 11:37 AM > Subject:Re: [DISCUSS] KIP-54 Sticky Partition Assignment Strategy > > > > Hi Vahid, > > Thanks for the proposal. I think one of the advantages of having sticky > assignment would be reduce the need to cleanup local partition state > between rebalances. Do you have any thoughts on how the user would take > advantage of this assignor in the consumer to do this? Maybe one approach > is to delay cleanup until you detect a change from the previous assignment > in the onPartitionsAssigned() callback? > > Also, can you provide some detail on how the sticky assignor works at the > group protocol level? For example, do you pass old assignments through the > "UserData" field in the consumer's JoinGroup? > > Thanks, > Jason > > On Thu, Apr 14, 2016 at 11:05 AM, Vahid S Hashemian < > vahidhashem...@us.ibm.com> wrote: > > > Hi all, > > > > I have started a new KIP under > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-54+-+Sticky+Partition+Assignment+Strategy > > > The corresponding JIRA is at > > https://issues.apache.org/jira/browse/KAFKA-2273 > > The corresponding PR is at https://github.com/apache/kafka/pull/1020 > > > > Your feedback is much appreciated. > > > > Regards, > > Vahid Hashemian > > > > > > > > > -- Thanks, Ewen
Re: [DISCUSS] KIP-54 Sticky Partition Assignment Strategy
Just FYI, the StreamsPartitionAssignor in Kafka Streams are already doing some sort of sticky partitioning mechanism. This is done through the userData field though; i.e. all group members send their current "assigned partitions" in their join group request, which will be grouped and send to the leader, the leader then does best-effort for sticky-partitioning. Guozhang On Fri, Apr 29, 2016 at 9:48 PM, Ewen Cheslack-Postava wrote: > I think I'm unclear how we leverage the > onPartitionsRevoked/onPartitionsAssigned here in any way that's different > from our normal usage -- certainly you can use them to generate a diff, but > you still need to commit when partitions are revoked and that has a > non-trivial cost. Are we just saying that you might be able to save some > overhead, e.g. closing/reopening some other resources by doing a flush but > not a close() or something? You still need to flush any output and commit > offsets before returning from onPartitionsRevoked, right? Otherwise you > couldn't guarantee clean handoff of partitions. > > In terms of the rebalancing, the basic requirements in the KIP seem sound. > Passing previous assignment data via UserData also seems reasonable since > it avoids redistributing all assignment data to all members and doesn't > rely on the next generation leader being a member of the current > generation. Hopefully this shouldn't be surprising since I think I > discussed this w/ Jason before he updated the relevant wiki pages :) > > -Ewen > > > On Mon, Apr 18, 2016 at 9:34 AM, Vahid S Hashemian < > vahidhashem...@us.ibm.com> wrote: > > > HI Jason, > > > > Thanks for your feedback. > > > > I believe your suggestion on how to take advantage of this assignor is > > valid. We can leverage onPartitionsRevoked() and onPartitionsAssigned() > > callbacks and do a comparison of assigned partitions before and after the > > re-balance and do the cleanup only if there is a change (e.g., if some > > previously assigned partition is not in the assignment). > > > > On your second question, a number of tests that I ran shows that the old > > assignments are preserved in the current implementation; except for when > > the consumer group leader is killed; in which case, a fresh assignment is > > performed. This is something that needs to be fixed. I tried to use your > > pointers to find out where the best place is to preserve the old > > assignment in such circumstances but have not been able to pinpoint it. > If > > you have any suggestion on this please share. Thanks. > > > > Regards, > > Vahid Hashemian > > > > > > > > > > From: Jason Gustafson > > To: dev@kafka.apache.org > > Date: 04/14/2016 11:37 AM > > Subject:Re: [DISCUSS] KIP-54 Sticky Partition Assignment Strategy > > > > > > > > Hi Vahid, > > > > Thanks for the proposal. I think one of the advantages of having sticky > > assignment would be reduce the need to cleanup local partition state > > between rebalances. Do you have any thoughts on how the user would take > > advantage of this assignor in the consumer to do this? Maybe one approach > > is to delay cleanup until you detect a change from the previous > assignment > > in the onPartitionsAssigned() callback? > > > > Also, can you provide some detail on how the sticky assignor works at the > > group protocol level? For example, do you pass old assignments through > the > > "UserData" field in the consumer's JoinGroup? > > > > Thanks, > > Jason > > > > On Thu, Apr 14, 2016 at 11:05 AM, Vahid S Hashemian < > > vahidhashem...@us.ibm.com> wrote: > > > > > Hi all, > > > > > > I have started a new KIP under > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-54+-+Sticky+Partition+Assignment+Strategy > > > > > The corresponding JIRA is at > > > https://issues.apache.org/jira/browse/KAFKA-2273 > > > The corresponding PR is at https://github.com/apache/kafka/pull/1020 > > > > > > Your feedback is much appreciated. > > > > > > Regards, > > > Vahid Hashemian > > > > > > > > > > > > > > > > > > > -- > Thanks, > Ewen > -- -- Guozhang
Re: [DISCUSS] KIP-54 Sticky Partition Assignment Strategy
I think I'm unclear how we leverage the onPartitionsRevoked/onPartitionsAssigned here in any way that's different from our normal usage -- certainly you can use them to generate a diff, but you still need to commit when partitions are revoked and that has a non-trivial cost. Are we just saying that you might be able to save some overhead, e.g. closing/reopening some other resources by doing a flush but not a close() or something? You still need to flush any output and commit offsets before returning from onPartitionsRevoked, right? Otherwise you couldn't guarantee clean handoff of partitions. In terms of the rebalancing, the basic requirements in the KIP seem sound. Passing previous assignment data via UserData also seems reasonable since it avoids redistributing all assignment data to all members and doesn't rely on the next generation leader being a member of the current generation. Hopefully this shouldn't be surprising since I think I discussed this w/ Jason before he updated the relevant wiki pages :) -Ewen On Mon, Apr 18, 2016 at 9:34 AM, Vahid S Hashemian < vahidhashem...@us.ibm.com> wrote: > HI Jason, > > Thanks for your feedback. > > I believe your suggestion on how to take advantage of this assignor is > valid. We can leverage onPartitionsRevoked() and onPartitionsAssigned() > callbacks and do a comparison of assigned partitions before and after the > re-balance and do the cleanup only if there is a change (e.g., if some > previously assigned partition is not in the assignment). > > On your second question, a number of tests that I ran shows that the old > assignments are preserved in the current implementation; except for when > the consumer group leader is killed; in which case, a fresh assignment is > performed. This is something that needs to be fixed. I tried to use your > pointers to find out where the best place is to preserve the old > assignment in such circumstances but have not been able to pinpoint it. If > you have any suggestion on this please share. Thanks. > > Regards, > Vahid Hashemian > > > > > From: Jason Gustafson > To: dev@kafka.apache.org > Date: 04/14/2016 11:37 AM > Subject:Re: [DISCUSS] KIP-54 Sticky Partition Assignment Strategy > > > > Hi Vahid, > > Thanks for the proposal. I think one of the advantages of having sticky > assignment would be reduce the need to cleanup local partition state > between rebalances. Do you have any thoughts on how the user would take > advantage of this assignor in the consumer to do this? Maybe one approach > is to delay cleanup until you detect a change from the previous assignment > in the onPartitionsAssigned() callback? > > Also, can you provide some detail on how the sticky assignor works at the > group protocol level? For example, do you pass old assignments through the > "UserData" field in the consumer's JoinGroup? > > Thanks, > Jason > > On Thu, Apr 14, 2016 at 11:05 AM, Vahid S Hashemian < > vahidhashem...@us.ibm.com> wrote: > > > Hi all, > > > > I have started a new KIP under > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-54+-+Sticky+Partition+Assignment+Strategy > > > The corresponding JIRA is at > > https://issues.apache.org/jira/browse/KAFKA-2273 > > The corresponding PR is at https://github.com/apache/kafka/pull/1020 > > > > Your feedback is much appreciated. > > > > Regards, > > Vahid Hashemian > > > > > > > > > -- Thanks, Ewen
Re: [DISCUSS] KIP-54 Sticky Partition Assignment Strategy
Hi why is the calculation of the partition assignments to group member being executed by the client (leader of the group), rather than server (eg by the group Coordinator) ? This question came up working with Vahid Hashemian on https://issues.apache.org/jira/browse/KAFKA-2273 We have implemented the propagation of the overall assignment solution of every consumer to every consumers in a group by using the userdata field in PartitionAssignor.Assignment. This way, even if the leader dies, any other consumer on becoming the leader has access to the last computed assignment for everyone. However the fact that these pluggable assignment strategies execute on the client, makes the implementation of clients in other languages more laborious. If they were executing in the broker, every language would take advantage of the available strategies. Would it be feasible to move the execution on the server? Is this worth a new KIP? thanks, Edo -- Edoardo Comar MQ Cloud Technologies eco...@uk.ibm.com +44 (0)1962 81 5576 IBM UK Ltd, Hursley Park, SO21 2JN IBM United Kingdom Limited Registered in England and Wales with number 741598 Registered office: PO Box 41, North Harbour, Portsmouth, Hants. PO6 3AU Unless stated otherwise above: IBM United Kingdom Limited - Registered in England and Wales with number 741598. Registered office: PO Box 41, North Harbour, Portsmouth, Hampshire PO6 3AU
Re: [DISCUSS] KIP-54 Sticky Partition Assignment Strategy
Hey Edo, The background is here: https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Client-side+Assignment+Proposal. Yes, it's more work for implementations, but it also opens the door for other use cases (e.g. Kafka Connect is using the same protocol). It also makes it easier for clients to provide their own partition assignor implementations since they don't need to hook into the broker. Thanks, Jason On Thu, Apr 28, 2016 at 4:11 PM, Edoardo Comar wrote: > Hi > > why is the calculation of the partition assignments to group member being > executed by the client (leader of the group), > rather than server (eg by the group Coordinator) ? > > This question came up working with Vahid Hashemian on > https://issues.apache.org/jira/browse/KAFKA-2273 > We have implemented the propagation of the overall assignment solution of > every consumer to every consumers in a group > by using the userdata field in PartitionAssignor.Assignment. > This way, even if the leader dies, any other consumer on becoming the > leader has access to the last computed assignment for everyone. > > However the fact that these pluggable assignment strategies execute on the > client, makes the implementation of clients in other languages more > laborious. > If they were executing in the broker, every language would take advantage > of the available strategies. > > Would it be feasible to move the execution on the server? Is this worth a > new KIP? > > thanks, > Edo > -- > Edoardo Comar > MQ Cloud Technologies > eco...@uk.ibm.com > +44 (0)1962 81 5576 > IBM UK Ltd, Hursley Park, SO21 2JN > > IBM United Kingdom Limited Registered in England and Wales with number > 741598 Registered office: PO Box 41, North Harbour, Portsmouth, Hants. PO6 > 3AU > Unless stated otherwise above: > IBM United Kingdom Limited - Registered in England and Wales with number > 741598. > Registered office: PO Box 41, North Harbour, Portsmouth, Hampshire PO6 3AU > >
Re: [DISCUSS] KIP-54 Sticky Partition Assignment Strategy
Hi why is the calculation of the partition assignments to group member being executed by the client (leader of the group), rather than server (eg by the group Coordinator) ? This question came up working with Vahid Hashemian on https://issues.apache.org/jira/browse/KAFKA-2273 We have implemented the propagation of the overall assignment solution of every consumer to every consumers in a group by using the userdata field in PartitionAssignor.Assignment. This way, even if the leader dies, any other consumer on becoming the leader has access to the last computed assignment for everyone. However the fact that these pluggable assignment strategies execute on the client, makes the implementation of clients in other languages more laborious. If they were executing in the broker, every language would take advantage of the available strategies. Would it be feasible to move the execution on the server? Is this worth a new KIP? thanks, Edo -- Edoardo Comar MQ Cloud Technologies eco...@uk.ibm.com +44 (0)1962 81 5576 IBM UK Ltd, Hursley Park, SO21 2JN IBM United Kingdom Limited Registered in England and Wales with number 741598 Registered office: PO Box 41, North Harbour, Portsmouth, Hants. PO6 3AU Unless stated otherwise above: IBM United Kingdom Limited - Registered in England and Wales with number 741598. Registered office: PO Box 41, North Harbour, Portsmouth, Hampshire PO6 3AU
Re: [DISCUSS] KIP-54 Sticky Partition Assignment Strategy
HI Jason, Thanks for your feedback. I believe your suggestion on how to take advantage of this assignor is valid. We can leverage onPartitionsRevoked() and onPartitionsAssigned() callbacks and do a comparison of assigned partitions before and after the re-balance and do the cleanup only if there is a change (e.g., if some previously assigned partition is not in the assignment). On your second question, a number of tests that I ran shows that the old assignments are preserved in the current implementation; except for when the consumer group leader is killed; in which case, a fresh assignment is performed. This is something that needs to be fixed. I tried to use your pointers to find out where the best place is to preserve the old assignment in such circumstances but have not been able to pinpoint it. If you have any suggestion on this please share. Thanks. Regards, Vahid Hashemian From: Jason Gustafson To: dev@kafka.apache.org Date: 04/14/2016 11:37 AM Subject:Re: [DISCUSS] KIP-54 Sticky Partition Assignment Strategy Hi Vahid, Thanks for the proposal. I think one of the advantages of having sticky assignment would be reduce the need to cleanup local partition state between rebalances. Do you have any thoughts on how the user would take advantage of this assignor in the consumer to do this? Maybe one approach is to delay cleanup until you detect a change from the previous assignment in the onPartitionsAssigned() callback? Also, can you provide some detail on how the sticky assignor works at the group protocol level? For example, do you pass old assignments through the "UserData" field in the consumer's JoinGroup? Thanks, Jason On Thu, Apr 14, 2016 at 11:05 AM, Vahid S Hashemian < vahidhashem...@us.ibm.com> wrote: > Hi all, > > I have started a new KIP under > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-54+-+Sticky+Partition+Assignment+Strategy > The corresponding JIRA is at > https://issues.apache.org/jira/browse/KAFKA-2273 > The corresponding PR is at https://github.com/apache/kafka/pull/1020 > > Your feedback is much appreciated. > > Regards, > Vahid Hashemian > >
Re: [DISCUSS] KIP-54 Sticky Partition Assignment Strategy
Hi Vahid, Thanks for the proposal. I think one of the advantages of having sticky assignment would be reduce the need to cleanup local partition state between rebalances. Do you have any thoughts on how the user would take advantage of this assignor in the consumer to do this? Maybe one approach is to delay cleanup until you detect a change from the previous assignment in the onPartitionsAssigned() callback? Also, can you provide some detail on how the sticky assignor works at the group protocol level? For example, do you pass old assignments through the "UserData" field in the consumer's JoinGroup? Thanks, Jason On Thu, Apr 14, 2016 at 11:05 AM, Vahid S Hashemian < vahidhashem...@us.ibm.com> wrote: > Hi all, > > I have started a new KIP under > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-54+-+Sticky+Partition+Assignment+Strategy > The corresponding JIRA is at > https://issues.apache.org/jira/browse/KAFKA-2273 > The corresponding PR is at https://github.com/apache/kafka/pull/1020 > > Your feedback is much appreciated. > > Regards, > Vahid Hashemian > >
[DISCUSS] KIP-54 Sticky Partition Assignment Strategy
Hi all, I have started a new KIP under https://cwiki.apache.org/confluence/display/KAFKA/KIP-54+-+Sticky+Partition+Assignment+Strategy The corresponding JIRA is at https://issues.apache.org/jira/browse/KAFKA-2273 The corresponding PR is at https://github.com/apache/kafka/pull/1020 Your feedback is much appreciated. Regards, Vahid Hashemian