BTW, "roundrobin" was a recent addition so you would need to be on
trunk to use that.  The partition assignor will lay out all the
available consumer threads; and all the available partitions in a
deterministic order (based on a hashcode); it then uses a circular
iterator over the consumers and the partitions to assign partitions to
consumers one after the other.

So suppose there is only one topic with 12 partitions ordered as
follows:
topicA-part-8, topicA-part-5, topicA-part-4, topicA-part-7, topicA-part-10, 
topicA-part-3, topicA-part-9, topicA-part-2, topicA-part-0, topicA-part-11, 
topicA-part-1, topicA-part-6

Suppose there are three consumer processes, two streams each. So the
laid out consumer threads would be:
c0-0, c0-1, c1-0, c1-1, c2-0, c2-1

The assignment would be:
topicA-part-8 -> c0-0
topicA-part-5 -> c0-1
topicA-part-4 -> c1-0
topicA-part-7 -> c2-0
topicA-part-10 -> c2-1
topicA-part-3 -> c0-0
...

You can play with the PartitionAssignorTest (unit test) if you want to
dig into this more.

Thanks,

Joel

On Thu, Oct 30, 2014 at 02:40:28PM -0700, Bhavesh Mistry wrote:
> HI Joel,
> 
> Correction to my previous question:   What is expected behavior of *roundrobin
> *policy above scenario  ?
> 
> Thanks,
> 
> Bhavesh
> 
> On Thu, Oct 30, 2014 at 1:39 PM, Bhavesh Mistry <mistry.p.bhav...@gmail.com>
> wrote:
> 
> > Hi Joel,
> >
> > I have similar issue.  I have tried *partition.assignment.strategy=*
> > *"roundrobin"*, but how do you accept this accept to work ?
> >
> > We have a topic with 32 partitions and 4 JVM with 10 threads each ( 8 is
> > backup if one of JVM goes down).   The roundrobin does not select all the
> > JVM only 3 JVM but uneven distribution of threads across 4 JVMs (the 4th
> > JVM does not get any active consumption threads).  What is best way to
> > evenly (or close to even)  distribute the consumption threads  across JVMs.
> >
> >
> > Thanks,
> >
> > Bhavesh
> >
> > On Thu, Oct 30, 2014 at 10:07 AM, Joel Koshy <jjkosh...@gmail.com> wrote:
> >
> >>
> >> > example: launching 4 processes on 4 different machines with 4 threads
> >> per
> >> > process on 12 partition topic will have each machine with 3 assigned
> >> > threads and one doing nothing. more over no matter what number of
> >> threads
> >> > each process will have , as long as it is bigger then 3, the end result
> >> > will stay the same with 3 assigned threads per machine, and the rest of
> >> > them doing nothing.
> >> >
> >> > Ideally, I would want something like consumer set/ensemble/{what ever
> >> word
> >> > not group} that will be used to denote a group of threads on a machine,
> >> > so that when specific threads request to join a consumer group they
> >> will be
> >> > elected so that they are balanced across the machine denoted by the
> >> > consumer set/ensemble identifier.
> >> >
> >> > will partition.assignment.strategy="roundrobin" help with that?
> >>
> >> You can give it a try. It does have the constraint the subscription is
> >> identical across your machines. i.e., it should work in your above
> >> scenario (4 process on 4 machines with 4 threads per process). The
> >> partition assignment will assign partitions to threads in a
> >> round-robin manner. The difference of max(owned) and min(owned) will
> >> be exactly one.
> >>
> >> We can discuss improving partition assignment strategies in the 0.9
> >> release with the new consumer.
> >>
> >> On Thu, Oct 30, 2014 at 08:52:40AM +0200, Shlomi Hazan wrote:
> >> > Jun, Joel,
> >> >
> >> > The issue here is exactly which threads are left out, and which threads
> >> are
> >> > assigned partitions.
> >> > Maybe I am missing something but what I want is to balance consuming
> >> > threads across machines/processes, regardless of the amount of threads
> >> the
> >> > machine launches (side effect: this way if you have more threads than
> >> > partitions you get a reserve force awaiting to charge in).
> >> >
> >> > example: launching 4 processes on 4 different machines with 4 threads
> >> per
> >> > process on 12 partition topic will have each machine with 3 assigned
> >> > threads and one doing nothing. more over no matter what number of
> >> threads
> >> > each process will have , as long as it is bigger then 3, the end result
> >> > will stay the same with 3 assigned threads per machine, and the rest of
> >> > them doing nothing.
> >> >
> >> > Ideally, I would want something like consumer set/ensemble/{what ever
> >> word
> >> > not group} that will be used to denote a group of threads on a machine,
> >> > so that when specific threads request to join a consumer group they
> >> will be
> >> > elected so that they are balanced across the machine denoted by the
> >> > consumer set/ensemble identifier.
> >> >
> >> > will partition.assignment.strategy="roundrobin" help with that?
> >> > 10x,
> >> > Shlomi
> >> >
> >> > On Thu, Oct 30, 2014 at 4:00 AM, Joel Koshy <jjkosh...@gmail.com>
> >> wrote:
> >> >
> >> > > Shlomi,
> >> > >
> >> > > If you are on trunk, and your consumer subscriptions are identical
> >> > > then you can try a slightly different partition assignment strategy.
> >> > > Try setting partition.assignment.strategy="roundrobin" in your
> >> > > consumer config.
> >> > >
> >> > > Thanks,
> >> > >
> >> > > Joel
> >> > >
> >> > > On Wed, Oct 29, 2014 at 06:29:30PM -0700, Jun Rao wrote:
> >> > > > By consumer, I actually mean consumer threads (the thread # you
> >> used when
> >> > > > creating consumer streams). So, if you have 4 consumers, each with 4
> >> > > > threads, 4 of the threads will not get any data with 12 partitions.
> >> It
> >> > > > sounds like that's not what you get?  What's the output of the
> >> > > > ConsumerOffsetChecker (see
> >> http://kafka.apache.org/documentation.html)?
> >> > > >
> >> > > > For consumer.id, you don't need to set it in general. We generate
> >> some
> >> > > uuid
> >> > > > automatically.
> >> > > >
> >> > > > Thanks,
> >> > > >
> >> > > > Jun
> >> > > >
> >> > > > On Tue, Oct 28, 2014 at 4:59 AM, Shlomi Hazan <shl...@viber.com>
> >> wrote:
> >> > > >
> >> > > > > Jun,
> >> > > > >
> >> > > > > I hear you say "partitions are evenly distributed among all
> >> consumers
> >> > > in
> >> > > > > the same group", yet I did bump into a case where launching a
> >> process
> >> > > with
> >> > > > > X high level consumer API threads took over all partitions,
> >> sending
> >> > > > > existing consumers to be unemployed.
> >> > > > >
> >> > > > > According to the claim above, and if I am not mistaken:
> >> > > > > on a topic T with 12 partitions and 3 consumers C1-C3 on the same
> >> group
> >> > > > > with 4 threads each,
> >> > > > > adding a new consumer C4 with 12 threads should yield the
> >> following
> >> > > > > balance:
> >> > > > > C1-C3 each relinquish a single partition holding only 3 partitions
> >> > > each.
> >> > > > > C4 holds the 3 partitions relinquished by C1-C3.
> >> > > > > Yet, in the case I described what happened is that C4 gained all
> >> 12
> >> > > > > partitions and sent C1-C3 out of business with 0 partitions each.
> >> > > > > Now maybe I overlooked something but I think I did see that
> >> happen.
> >> > > > >
> >> > > > > BTW
> >> > > > > What key is used to distinguish one consumer from another? "
> >> > > consumer.id"?
> >> > > > > docs for "consumer.id" are "Generated automatically if not set."
> >> > > > > What is the best practice for setting it's value? leave empty? is
> >> > > server
> >> > > > > host name good enough? what are the considerations?
> >> > > > > When using the high level consumer API, are all threads
> >> identified as
> >> > > the
> >> > > > > same consumer? I guess they are, right?...
> >> > > > >
> >> > > > > Thanks,
> >> > > > > Shlomi
> >> > > > >
> >> > > > >
> >> > > > > On Tue, Oct 28, 2014 at 4:21 AM, Jun Rao <jun...@gmail.com>
> >> wrote:
> >> > > > >
> >> > > > > > You can take a look at the "consumer rebalancing algorithm"
> >> part in
> >> > > > > > http://kafka.apache.org/documentation.html. Basically,
> >> partitions
> >> > > are
> >> > > > > > evenly distributed among all consumers in the same group. If
> >> there
> >> > > are
> >> > > > > more
> >> > > > > > consumers in a group than partitions, some consumers will never
> >> get
> >> > > any
> >> > > > > > data.
> >> > > > > >
> >> > > > > > Thanks,
> >> > > > > >
> >> > > > > > Jun
> >> > > > > >
> >> > > > > > On Mon, Oct 27, 2014 at 4:14 AM, Shlomi Hazan <shl...@viber.com
> >> >
> >> > > wrote:
> >> > > > > >
> >> > > > > > > Hi All,
> >> > > > > > >
> >> > > > > > > Using Kafka's high consumer API I have bumped into a situation
> >> > > where
> >> > > > > > > launching a consumer process P1 with X consuming threads on a
> >> topic
> >> > > > > with
> >> > > > > > X
> >> > > > > > > partition kicks out all other existing consumer threads that
> >> > > consumed
> >> > > > > > prior
> >> > > > > > > to launching the process P.
> >> > > > > > > That is, consumer process P is stealing all partitions from
> >> all
> >> > > other
> >> > > > > > > consumer processes.
> >> > > > > > >
> >> > > > > > > While understandable, it makes it hard to size & deploy a
> >> cluster
> >> > > with
> >> > > > > a
> >> > > > > > > number of partitions that will both allow balancing of
> >> consumption
> >> > > > > across
> >> > > > > > > consuming processes, dividing the partitions across consumers
> >> by
> >> > > > > setting
> >> > > > > > > each consumer with it's share of the total number of
> >> partitions on
> >> > > the
> >> > > > > > > consumed topic, and on the other hand provide room for growth
> >> and
> >> > > > > > addition
> >> > > > > > > of new consumers to help with increasing traffic into the
> >> cluster
> >> > > and
> >> > > > > the
> >> > > > > > > topic.
> >> > > > > > >
> >> > > > > > > This stealing effect forces me to have more partitions then
> >> really
> >> > > > > needed
> >> > > > > > > at the moment, planning for future growth, or stick to what I
> >> need
> >> > > and
> >> > > > > > > trust the option to add partitions which comes with a price in
> >> > > terms of
> >> > > > > > > restarting consumers, bumping into out of order messages (hash
> >> > > > > > > partitioning) etc.
> >> > > > > > >
> >> > > > > > > Is this policy of stealing is intended, or did I just jump to
> >> > > > > > conclusions?
> >> > > > > > > what is the way to cope with the sizing question?
> >> > > > > > >
> >> > > > > > > Shlomi
> >> > > > > > >
> >> > > > > >
> >> > > > >
> >> > >
> >> > >
> >>
> >>

Reply via email to