Hi Vahid, The only thing I added was the specification of the UserData field. The rest comes from here: https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol. See the section on the JoinGroup request.
Generally speaking, I think having fewer assignment strategies included with Kafka is probably better. One of the advantages of the client-side assignment approach is that there's no actual need to bundle them into the release. Applications can use them by depending on a separate library. That said, sticky assignment seems like a generally good idea and a common need, so it may be helpful for a lot of users to make it easily available in the release. If it also addresses the issues raised in KIP-49, then so much the better. As for whether we should include both, there I'm not too sure. Most users probably wouldn't have a strong reason to choose the "fair" assignment over the "sticky" assignment since they both seem to have the same properties in terms of balancing the group's partitions. The overhead is a concern for large groups with many topic subscriptions though, so if people think that the "fair" approach brings a lot of benefit over round-robin, then it may be worth including also. -Jason On Mon, Jun 6, 2016 at 5:17 PM, Vahid S Hashemian <vahidhashem...@us.ibm.com > wrote: > Hi Jason, > > Thanks for reviewing the KIP. > I will add the details you requested, but to summarize: > > Regarding the structure of the user data: > > Right now the user data will have the current assignments only which is a > mapping of consumers to their assigned topic partitions. Is this mapping > what you're also suggesting with CurrentAssignment field? > I see how adding a version (as sticky assignor version) will be useful. > Also how having a protocol name would be useful, perhaps for validation. > But could you clarify the "Subscription" field and how you think it'll > come into play? > > > Regarding the algorithm: > > There could be similarities between how this KIP is implemented and how > KIP-49 is handling the fairness. But since we had to take stickiness into > consideration we started fresh and did not adopt from KIP-49. > The Sticky assignor implementation is comprehensive and guarantees the > fairest possible assignment with highest stickiness. I even have a unit > test that randomly generates an assignment problem and verifies that a > fair and sticky assignment is calculated. > KIP-54 gives priority to fairness over stickiness (which makes the > implementation more complex). We could have another strategy that gives > priority to stickiness over fairness (which supposedly will have a better > performance). > The main distinction between KIP-54 and KIP-49 is that KIP-49 calculates > the assignment without considering the previous assignments (fairness > only); whereas for KIP-54 previous assignments play a big role (fairness > and stickiness). > I believe if there is a situation where the stickiness requirements do not > exist it would make sense to use a fair-only assignment without the > overhead of sticky assignment, as you mentioned. > So, I could see three different strategies that could enrich assignment > policy options. > It would be great to have some feedback from the community about what is > the best way to move forward with these two KIPs. > > In the meantime, I'll add some more details in the KIP about the approach > for calculating assignments. > > Thanks again. > > Regards, > --Vahid > > > > > From: Jason Gustafson <ja...@confluent.io> > To: dev@kafka.apache.org > Date: 06/06/2016 01:26 PM > Subject: Re: [DISCUSS] KIP-54 Sticky Partition Assignment Strategy > > > > Hi Vahid, > > Can you add some detail to the KIP on the structure of the user data? I'm > guessing it would be something like this: > > ProtocolName => "sticky" > > ProtocolMetadata => Version Subscription UserData > Version => int16 > Subscription => [Topic] > Topic => string > UserData => CurrentAssignment > CurrentAssignment => [Topic [Partition]] > Topic => string > Partiton => int32 > > It would also be helpful to include a little more detail on the algorithm. > From what I can tell, it looks like you're adopting some of the strategies > from KIP-49 to handle differing subscriptions better. If so, then I wonder > if it makes sense to combine the two KIPs? Or do you think there would be > an advantage to having the "fair" assignment strategy without the overhead > of the sticky assignor? > > Thanks, > Jason > > > > On Fri, Jun 3, 2016 at 11:33 AM, Guozhang Wang <wangg...@gmail.com> wrote: > > > Sorry for being late on this thread. > > > > The assign() function is auto-triggered during the rebalance by one of > the > > consumers when it receives all subscription information collected from > the > > server-side coordinator. > > > > More details can be found here: > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Client-side+Assignment+Proposal#KafkaClient-sideAssignmentProposal-ConsumerEmbeddedProtocol > > > > > As for Kafka Streams, they way it did "stickiness" is by 1) let all > > consumers put their current assigned topic-partitions and server ids > into > > the "metadata" field of the JoinGroupRequest, 2) when the selected > consumer > > triggers assign() along with all the subscriptions as well as their > > metadata, it can parse the metadata to learn about the existing > assignment > > map; and hence when making the new assignment it will try to assign > > partitions to its current owners "with best effort". > > > > > > Hope this helps. > > > > > > Guozhang > > > > > > On Thu, May 26, 2016 at 4:56 PM, Vahid S Hashemian < > > vahidhashem...@us.ibm.com> wrote: > > > > > Hi Guozhang, > > > > > > I was looking at the implementation of StreamsPartitionAssignor > through > > > its unit tests and expected to find some tests that > > > - verify stickiness by making at least two calls to the assign() > method > > > (so we check the second assign() call output preserves the assignments > > > coming from the first assign() call output); or > > > - start off by a preset assignment, call assign() after some > subscription > > > change, and verify the previous assignment are preserved. > > > But none of the methods seem to do these. Did I overlook them, or > > > stickiness is being tested in some other fashion? > > > > > > Also, if there is a high-level write-up about how this assignor works > > > could you please point me to it? Thanks. > > > > > > Regards. > > > --Vahid > > > > > > > > > > > > > > > From: Guozhang Wang <wangg...@gmail.com> > > > To: "dev@kafka.apache.org" <dev@kafka.apache.org> > > > Date: 05/02/2016 10:34 AM > > > Subject: Re: [DISCUSS] KIP-54 Sticky Partition Assignment > Strategy > > > > > > > > > > > > Just FYI, the StreamsPartitionAssignor in Kafka Streams are already > doing > > > some sort of sticky partitioning mechanism. This is done through the > > > userData field though; i.e. all group members send their current > > "assigned > > > partitions" in their join group request, which will be grouped and > send > > to > > > the leader, the leader then does best-effort for sticky-partitioning. > > > > > > > > > Guozhang > > > > > > On Fri, Apr 29, 2016 at 9:48 PM, Ewen Cheslack-Postava < > > e...@confluent.io> > > > wrote: > > > > > > > I think I'm unclear how we leverage the > > > > onPartitionsRevoked/onPartitionsAssigned here in any way that's > > > different > > > > from our normal usage -- certainly you can use them to generate a > diff, > > > but > > > > you still need to commit when partitions are revoked and that has a > > > > non-trivial cost. Are we just saying that you might be able to save > > some > > > > overhead, e.g. closing/reopening some other resources by doing a > flush > > > but > > > > not a close() or something? You still need to flush any output and > > > commit > > > > offsets before returning from onPartitionsRevoked, right? Otherwise > you > > > > couldn't guarantee clean handoff of partitions. > > > > > > > > In terms of the rebalancing, the basic requirements in the KIP seem > > > sound. > > > > Passing previous assignment data via UserData also seems reasonable > > > since > > > > it avoids redistributing all assignment data to all members and > doesn't > > > > rely on the next generation leader being a member of the current > > > > generation. Hopefully this shouldn't be surprising since I think I > > > > discussed this w/ Jason before he updated the relevant wiki pages :) > > > > > > > > -Ewen > > > > > > > > > > > > On Mon, Apr 18, 2016 at 9:34 AM, Vahid S Hashemian < > > > > vahidhashem...@us.ibm.com> wrote: > > > > > > > > > HI Jason, > > > > > > > > > > Thanks for your feedback. > > > > > > > > > > I believe your suggestion on how to take advantage of this > assignor > > is > > > > > valid. We can leverage onPartitionsRevoked() and > > > onPartitionsAssigned() > > > > > callbacks and do a comparison of assigned partitions before and > after > > > the > > > > > re-balance and do the cleanup only if there is a change (e.g., if > > some > > > > > previously assigned partition is not in the assignment). > > > > > > > > > > On your second question, a number of tests that I ran shows that > the > > > old > > > > > assignments are preserved in the current implementation; except > for > > > when > > > > > the consumer group leader is killed; in which case, a fresh > > assignment > > > is > > > > > performed. This is something that needs to be fixed. I tried to > use > > > your > > > > > pointers to find out where the best place is to preserve the old > > > > > assignment in such circumstances but have not been able to > pinpoint > > > it. > > > > If > > > > > you have any suggestion on this please share. Thanks. > > > > > > > > > > Regards, > > > > > Vahid Hashemian > > > > > > > > > > > > > > > > > > > > > > > > > From: Jason Gustafson <ja...@confluent.io> > > > > > To: dev@kafka.apache.org > > > > > Date: 04/14/2016 11:37 AM > > > > > Subject: Re: [DISCUSS] KIP-54 Sticky Partition Assignment > > > Strategy > > > > > > > > > > > > > > > > > > > > Hi Vahid, > > > > > > > > > > Thanks for the proposal. I think one of the advantages of having > > > sticky > > > > > assignment would be reduce the need to cleanup local partition > state > > > > > between rebalances. Do you have any thoughts on how the user would > > > take > > > > > advantage of this assignor in the consumer to do this? Maybe one > > > approach > > > > > is to delay cleanup until you detect a change from the previous > > > > assignment > > > > > in the onPartitionsAssigned() callback? > > > > > > > > > > Also, can you provide some detail on how the sticky assignor works > at > > > the > > > > > group protocol level? For example, do you pass old assignments > > through > > > > the > > > > > "UserData" field in the consumer's JoinGroup? > > > > > > > > > > Thanks, > > > > > Jason > > > > > > > > > > On Thu, Apr 14, 2016 at 11:05 AM, Vahid S Hashemian < > > > > > vahidhashem...@us.ibm.com> wrote: > > > > > > > > > > > Hi all, > > > > > > > > > > > > I have started a new KIP under > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-54+-+Sticky+Partition+Assignment+Strategy > > > > > > > > > > > > > > > The corresponding JIRA is at > > > > > > https://issues.apache.org/jira/browse/KAFKA-2273 > > > > > > The corresponding PR is at > > https://github.com/apache/kafka/pull/1020 > > > > > > > > > > > > Your feedback is much appreciated. > > > > > > > > > > > > Regards, > > > > > > Vahid Hashemian > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > Thanks, > > > > Ewen > > > > > > > > > > > > > > > > -- > > > -- Guozhang > > > > > > > > > > > > > > > > > > > > > -- > > -- Guozhang > > > > > > >