BTW, "roundrobin" was a recent addition so you would need to be on trunk to use that. The partition assignor will lay out all the available consumer threads; and all the available partitions in a deterministic order (based on a hashcode); it then uses a circular iterator over the consumers and the partitions to assign partitions to consumers one after the other.
So suppose there is only one topic with 12 partitions ordered as follows: topicA-part-8, topicA-part-5, topicA-part-4, topicA-part-7, topicA-part-10, topicA-part-3, topicA-part-9, topicA-part-2, topicA-part-0, topicA-part-11, topicA-part-1, topicA-part-6 Suppose there are three consumer processes, two streams each. So the laid out consumer threads would be: c0-0, c0-1, c1-0, c1-1, c2-0, c2-1 The assignment would be: topicA-part-8 -> c0-0 topicA-part-5 -> c0-1 topicA-part-4 -> c1-0 topicA-part-7 -> c2-0 topicA-part-10 -> c2-1 topicA-part-3 -> c0-0 ... You can play with the PartitionAssignorTest (unit test) if you want to dig into this more. Thanks, Joel On Thu, Oct 30, 2014 at 02:40:28PM -0700, Bhavesh Mistry wrote: > HI Joel, > > Correction to my previous question: What is expected behavior of *roundrobin > *policy above scenario ? > > Thanks, > > Bhavesh > > On Thu, Oct 30, 2014 at 1:39 PM, Bhavesh Mistry <mistry.p.bhav...@gmail.com> > wrote: > > > Hi Joel, > > > > I have similar issue. I have tried *partition.assignment.strategy=* > > *"roundrobin"*, but how do you accept this accept to work ? > > > > We have a topic with 32 partitions and 4 JVM with 10 threads each ( 8 is > > backup if one of JVM goes down). The roundrobin does not select all the > > JVM only 3 JVM but uneven distribution of threads across 4 JVMs (the 4th > > JVM does not get any active consumption threads). What is best way to > > evenly (or close to even) distribute the consumption threads across JVMs. > > > > > > Thanks, > > > > Bhavesh > > > > On Thu, Oct 30, 2014 at 10:07 AM, Joel Koshy <jjkosh...@gmail.com> wrote: > > > >> > >> > example: launching 4 processes on 4 different machines with 4 threads > >> per > >> > process on 12 partition topic will have each machine with 3 assigned > >> > threads and one doing nothing. more over no matter what number of > >> threads > >> > each process will have , as long as it is bigger then 3, the end result > >> > will stay the same with 3 assigned threads per machine, and the rest of > >> > them doing nothing. > >> > > >> > Ideally, I would want something like consumer set/ensemble/{what ever > >> word > >> > not group} that will be used to denote a group of threads on a machine, > >> > so that when specific threads request to join a consumer group they > >> will be > >> > elected so that they are balanced across the machine denoted by the > >> > consumer set/ensemble identifier. > >> > > >> > will partition.assignment.strategy="roundrobin" help with that? > >> > >> You can give it a try. It does have the constraint the subscription is > >> identical across your machines. i.e., it should work in your above > >> scenario (4 process on 4 machines with 4 threads per process). The > >> partition assignment will assign partitions to threads in a > >> round-robin manner. The difference of max(owned) and min(owned) will > >> be exactly one. > >> > >> We can discuss improving partition assignment strategies in the 0.9 > >> release with the new consumer. > >> > >> On Thu, Oct 30, 2014 at 08:52:40AM +0200, Shlomi Hazan wrote: > >> > Jun, Joel, > >> > > >> > The issue here is exactly which threads are left out, and which threads > >> are > >> > assigned partitions. > >> > Maybe I am missing something but what I want is to balance consuming > >> > threads across machines/processes, regardless of the amount of threads > >> the > >> > machine launches (side effect: this way if you have more threads than > >> > partitions you get a reserve force awaiting to charge in). > >> > > >> > example: launching 4 processes on 4 different machines with 4 threads > >> per > >> > process on 12 partition topic will have each machine with 3 assigned > >> > threads and one doing nothing. more over no matter what number of > >> threads > >> > each process will have , as long as it is bigger then 3, the end result > >> > will stay the same with 3 assigned threads per machine, and the rest of > >> > them doing nothing. > >> > > >> > Ideally, I would want something like consumer set/ensemble/{what ever > >> word > >> > not group} that will be used to denote a group of threads on a machine, > >> > so that when specific threads request to join a consumer group they > >> will be > >> > elected so that they are balanced across the machine denoted by the > >> > consumer set/ensemble identifier. > >> > > >> > will partition.assignment.strategy="roundrobin" help with that? > >> > 10x, > >> > Shlomi > >> > > >> > On Thu, Oct 30, 2014 at 4:00 AM, Joel Koshy <jjkosh...@gmail.com> > >> wrote: > >> > > >> > > Shlomi, > >> > > > >> > > If you are on trunk, and your consumer subscriptions are identical > >> > > then you can try a slightly different partition assignment strategy. > >> > > Try setting partition.assignment.strategy="roundrobin" in your > >> > > consumer config. > >> > > > >> > > Thanks, > >> > > > >> > > Joel > >> > > > >> > > On Wed, Oct 29, 2014 at 06:29:30PM -0700, Jun Rao wrote: > >> > > > By consumer, I actually mean consumer threads (the thread # you > >> used when > >> > > > creating consumer streams). So, if you have 4 consumers, each with 4 > >> > > > threads, 4 of the threads will not get any data with 12 partitions. > >> It > >> > > > sounds like that's not what you get? What's the output of the > >> > > > ConsumerOffsetChecker (see > >> http://kafka.apache.org/documentation.html)? > >> > > > > >> > > > For consumer.id, you don't need to set it in general. We generate > >> some > >> > > uuid > >> > > > automatically. > >> > > > > >> > > > Thanks, > >> > > > > >> > > > Jun > >> > > > > >> > > > On Tue, Oct 28, 2014 at 4:59 AM, Shlomi Hazan <shl...@viber.com> > >> wrote: > >> > > > > >> > > > > Jun, > >> > > > > > >> > > > > I hear you say "partitions are evenly distributed among all > >> consumers > >> > > in > >> > > > > the same group", yet I did bump into a case where launching a > >> process > >> > > with > >> > > > > X high level consumer API threads took over all partitions, > >> sending > >> > > > > existing consumers to be unemployed. > >> > > > > > >> > > > > According to the claim above, and if I am not mistaken: > >> > > > > on a topic T with 12 partitions and 3 consumers C1-C3 on the same > >> group > >> > > > > with 4 threads each, > >> > > > > adding a new consumer C4 with 12 threads should yield the > >> following > >> > > > > balance: > >> > > > > C1-C3 each relinquish a single partition holding only 3 partitions > >> > > each. > >> > > > > C4 holds the 3 partitions relinquished by C1-C3. > >> > > > > Yet, in the case I described what happened is that C4 gained all > >> 12 > >> > > > > partitions and sent C1-C3 out of business with 0 partitions each. > >> > > > > Now maybe I overlooked something but I think I did see that > >> happen. > >> > > > > > >> > > > > BTW > >> > > > > What key is used to distinguish one consumer from another? " > >> > > consumer.id"? > >> > > > > docs for "consumer.id" are "Generated automatically if not set." > >> > > > > What is the best practice for setting it's value? leave empty? is > >> > > server > >> > > > > host name good enough? what are the considerations? > >> > > > > When using the high level consumer API, are all threads > >> identified as > >> > > the > >> > > > > same consumer? I guess they are, right?... > >> > > > > > >> > > > > Thanks, > >> > > > > Shlomi > >> > > > > > >> > > > > > >> > > > > On Tue, Oct 28, 2014 at 4:21 AM, Jun Rao <jun...@gmail.com> > >> wrote: > >> > > > > > >> > > > > > You can take a look at the "consumer rebalancing algorithm" > >> part in > >> > > > > > http://kafka.apache.org/documentation.html. Basically, > >> partitions > >> > > are > >> > > > > > evenly distributed among all consumers in the same group. If > >> there > >> > > are > >> > > > > more > >> > > > > > consumers in a group than partitions, some consumers will never > >> get > >> > > any > >> > > > > > data. > >> > > > > > > >> > > > > > Thanks, > >> > > > > > > >> > > > > > Jun > >> > > > > > > >> > > > > > On Mon, Oct 27, 2014 at 4:14 AM, Shlomi Hazan <shl...@viber.com > >> > > >> > > wrote: > >> > > > > > > >> > > > > > > Hi All, > >> > > > > > > > >> > > > > > > Using Kafka's high consumer API I have bumped into a situation > >> > > where > >> > > > > > > launching a consumer process P1 with X consuming threads on a > >> topic > >> > > > > with > >> > > > > > X > >> > > > > > > partition kicks out all other existing consumer threads that > >> > > consumed > >> > > > > > prior > >> > > > > > > to launching the process P. > >> > > > > > > That is, consumer process P is stealing all partitions from > >> all > >> > > other > >> > > > > > > consumer processes. > >> > > > > > > > >> > > > > > > While understandable, it makes it hard to size & deploy a > >> cluster > >> > > with > >> > > > > a > >> > > > > > > number of partitions that will both allow balancing of > >> consumption > >> > > > > across > >> > > > > > > consuming processes, dividing the partitions across consumers > >> by > >> > > > > setting > >> > > > > > > each consumer with it's share of the total number of > >> partitions on > >> > > the > >> > > > > > > consumed topic, and on the other hand provide room for growth > >> and > >> > > > > > addition > >> > > > > > > of new consumers to help with increasing traffic into the > >> cluster > >> > > and > >> > > > > the > >> > > > > > > topic. > >> > > > > > > > >> > > > > > > This stealing effect forces me to have more partitions then > >> really > >> > > > > needed > >> > > > > > > at the moment, planning for future growth, or stick to what I > >> need > >> > > and > >> > > > > > > trust the option to add partitions which comes with a price in > >> > > terms of > >> > > > > > > restarting consumers, bumping into out of order messages (hash > >> > > > > > > partitioning) etc. > >> > > > > > > > >> > > > > > > Is this policy of stealing is intended, or did I just jump to > >> > > > > > conclusions? > >> > > > > > > what is the way to cope with the sizing question? > >> > > > > > > > >> > > > > > > Shlomi > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > >> > > > >> > >>