Hi Vahid,

Can you add some detail to the KIP on the structure of the user data? I'm
guessing it would be something like this:

ProtocolName => "sticky"

ProtocolMetadata => Version Subscription UserData
  Version => int16
  Subscription => [Topic]
    Topic => string
  UserData => CurrentAssignment
    CurrentAssignment => [Topic [Partition]]
      Topic => string
      Partiton => int32

It would also be helpful to include a little more detail on the algorithm.
>From what I can tell, it looks like you're adopting some of the strategies
from KIP-49 to handle differing subscriptions better. If so, then I wonder
if it makes sense to combine the two KIPs? Or do you think there would be
an advantage to having the "fair" assignment strategy without the overhead
of the sticky assignor?

Thanks,
Jason



On Fri, Jun 3, 2016 at 11:33 AM, Guozhang Wang <wangg...@gmail.com> wrote:

> Sorry for being late on this thread.
>
> The assign() function is auto-triggered during the rebalance by one of the
> consumers when it receives all subscription information collected from the
> server-side coordinator.
>
> More details can be found here:
>
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Client-side+Assignment+Proposal#KafkaClient-sideAssignmentProposal-ConsumerEmbeddedProtocol
>
> As for Kafka Streams, they way it did "stickiness" is by 1) let all
> consumers put their current assigned topic-partitions and server ids into
> the "metadata" field of the JoinGroupRequest, 2) when the selected consumer
> triggers assign() along with all the subscriptions as well as their
> metadata, it can parse the metadata to learn about the existing assignment
> map; and hence when making the new assignment it will try to assign
> partitions to its current owners "with best effort".
>
>
> Hope this helps.
>
>
> Guozhang
>
>
> On Thu, May 26, 2016 at 4:56 PM, Vahid S Hashemian <
> vahidhashem...@us.ibm.com> wrote:
>
> > Hi Guozhang,
> >
> > I was looking at the implementation of StreamsPartitionAssignor through
> > its unit tests and expected to find some tests that
> > - verify stickiness by making at least two calls to the assign() method
> > (so we check the second assign() call output preserves the assignments
> > coming from the first assign() call output); or
> > - start off by a preset assignment, call assign() after some subscription
> > change, and verify the previous assignment are preserved.
> > But none of the methods seem to do these. Did I overlook them, or
> > stickiness is being tested in some other fashion?
> >
> > Also, if there is a high-level write-up about how this assignor works
> > could you please point me to it? Thanks.
> >
> > Regards.
> > --Vahid
> >
> >
> >
> >
> > From:   Guozhang Wang <wangg...@gmail.com>
> > To:     "dev@kafka.apache.org" <dev@kafka.apache.org>
> > Date:   05/02/2016 10:34 AM
> > Subject:        Re: [DISCUSS] KIP-54 Sticky Partition Assignment Strategy
> >
> >
> >
> > Just FYI, the StreamsPartitionAssignor in Kafka Streams are already doing
> > some sort of sticky partitioning mechanism. This is done through the
> > userData field though; i.e. all group members send their current
> "assigned
> > partitions" in their join group request, which will be grouped and send
> to
> > the leader, the leader then does best-effort for sticky-partitioning.
> >
> >
> > Guozhang
> >
> > On Fri, Apr 29, 2016 at 9:48 PM, Ewen Cheslack-Postava <
> e...@confluent.io>
> > wrote:
> >
> > > I think I'm unclear how we leverage the
> > > onPartitionsRevoked/onPartitionsAssigned here in any way that's
> > different
> > > from our normal usage -- certainly you can use them to generate a diff,
> > but
> > > you still need to commit when partitions are revoked and that has a
> > > non-trivial cost. Are we just saying that you might be able to save
> some
> > > overhead, e.g. closing/reopening some other resources by doing a flush
> > but
> > > not a close() or something? You still need to flush any output and
> > commit
> > > offsets before returning from onPartitionsRevoked, right? Otherwise you
> > > couldn't guarantee clean handoff of partitions.
> > >
> > > In terms of the rebalancing, the basic requirements in the KIP seem
> > sound.
> > > Passing previous assignment data via UserData also seems reasonable
> > since
> > > it avoids redistributing all assignment data to all members and doesn't
> > > rely on the next generation leader being a member of the current
> > > generation. Hopefully this shouldn't be surprising since I think I
> > > discussed this w/ Jason before he updated the relevant wiki pages :)
> > >
> > > -Ewen
> > >
> > >
> > > On Mon, Apr 18, 2016 at 9:34 AM, Vahid S Hashemian <
> > > vahidhashem...@us.ibm.com> wrote:
> > >
> > > > HI Jason,
> > > >
> > > > Thanks for your feedback.
> > > >
> > > > I believe your suggestion on how to take advantage of this assignor
> is
> > > > valid. We can leverage onPartitionsRevoked() and
> > onPartitionsAssigned()
> > > > callbacks and do a comparison of assigned partitions before and after
> > the
> > > > re-balance and do the cleanup only if there is a change (e.g., if
> some
> > > > previously assigned partition is not in the assignment).
> > > >
> > > > On your second question, a number of tests that I ran shows that the
> > old
> > > > assignments are preserved in the current implementation; except for
> > when
> > > > the consumer group leader is killed; in which case, a fresh
> assignment
> > is
> > > > performed. This is something that needs to be fixed. I tried to use
> > your
> > > > pointers to find out where the best place is to preserve the old
> > > > assignment in such circumstances but have not been able to pinpoint
> > it.
> > > If
> > > > you have any suggestion on this please share. Thanks.
> > > >
> > > > Regards,
> > > > Vahid Hashemian
> > > >
> > > >
> > > >
> > > >
> > > > From:   Jason Gustafson <ja...@confluent.io>
> > > > To:     dev@kafka.apache.org
> > > > Date:   04/14/2016 11:37 AM
> > > > Subject:        Re: [DISCUSS] KIP-54 Sticky Partition Assignment
> > Strategy
> > > >
> > > >
> > > >
> > > > Hi Vahid,
> > > >
> > > > Thanks for the proposal. I think one of the advantages of having
> > sticky
> > > > assignment would be reduce the need to cleanup local partition state
> > > > between rebalances. Do you have any thoughts on how the user would
> > take
> > > > advantage of this assignor in the consumer to do this? Maybe one
> > approach
> > > > is to delay cleanup until you detect a change from the previous
> > > assignment
> > > > in the onPartitionsAssigned() callback?
> > > >
> > > > Also, can you provide some detail on how the sticky assignor works at
> > the
> > > > group protocol level? For example, do you pass old assignments
> through
> > > the
> > > > "UserData" field in the consumer's JoinGroup?
> > > >
> > > > Thanks,
> > > > Jason
> > > >
> > > > On Thu, Apr 14, 2016 at 11:05 AM, Vahid S Hashemian <
> > > > vahidhashem...@us.ibm.com> wrote:
> > > >
> > > > > Hi all,
> > > > >
> > > > > I have started a new KIP under
> > > > >
> > > > >
> > > >
> > > >
> > >
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-54+-+Sticky+Partition+Assignment+Strategy
> >
> > > >
> > > > > The corresponding JIRA is at
> > > > > https://issues.apache.org/jira/browse/KAFKA-2273
> > > > > The corresponding PR is at
> https://github.com/apache/kafka/pull/1020
> > > > >
> > > > > Your feedback is much appreciated.
> > > > >
> > > > > Regards,
> > > > > Vahid Hashemian
> > > > >
> > > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > >
> > >
> > > --
> > > Thanks,
> > > Ewen
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
> >
> >
> >
> >
>
>
> --
> -- Guozhang
>

Reply via email to