That sounds like a good suggestion. I'm actually looking at the code and I will start another thread for questions about that.
On Tue, Nov 17, 2015 at 5:42 PM, Jason Gustafson <ja...@confluent.io> wrote: > Thanks for the explanation. Certainly you'd use less connections with this > approach, but it might be worthwhile to do some performance analysis to see > whether there is much difference in throughput (I'd be interested in seeing > these results myself). Another approach that might be interesting would be > to implement your own partition assignor which took into account the > leaders of each partition. Then you could just use subscribe() and let > Kafka manage the group for you. This is similar to how we were thinking of > implementing consumer rack-awareness. > > -Jason > > On Tue, Nov 17, 2015 at 4:04 PM, hsy...@gmail.com <hsy...@gmail.com> > wrote: > > > By efficiency, I mean maximize throughput while minimize resources on > both > > broker sides and consumer sides. > > > > One example is if you have over 200 partitions on 10 brokers and you can > > start 5 consumer processes to consume data, if each one is single-thread > > and you do round-robin to distribute the load then each one will try to > > fetch from over 40 partitions one by one through 10 connections > > possibly(overall is 50), but if it's smart enough to group partitions by > > brokers, each process can have 2 separate threads(consuming from 2 > > different brokers concurrently). That seems a more optimal solution than > > another, right? > > > > On Tue, Nov 17, 2015 at 2:54 PM, Jason Gustafson <ja...@confluent.io> > > wrote: > > > > > Hi Siyuan, > > > > > > Your understanding about assign/subscribe is correct. We think of topic > > > subscription as enabling automatic assignment as opposed to doing > manual > > > assignment through assign(). We don't currently them to be mixed. > > > > > > Can you elaborate on your findings with respect to using one thread per > > > broker? In what sense was it more efficient? Doing the same thing might > > be > > > tricky with the new consumer, but I think you could do it using > > > partitionsFor() to find the current partition leaders and assign() to > set > > > the assignment in each thread. > > > > > > -Jason > > > > > > On Tue, Nov 17, 2015 at 10:25 AM, hsy...@gmail.com <hsy...@gmail.com> > > > wrote: > > > > > > > Thanks Guozhang, > > > > > > > > Maybe I should give a few words about what I'm going to achieve with > > new > > > > API > > > > > > > > Currently, I'm building a new kafka connector for Apache Apex( > > > > http://apex.incubator.apache.org/) using 0.9.0 API > > > > Apex support dynamic partition, so in the old version, We manage all > > the > > > > consumer partitions in either 1:1 strategy (each consumer process > > > consumes > > > > only from one kafka partition) or 1:n strategy (each consumer process > > > could > > > > consume from multiple kafka partitions, using round-robin to > > distribute) > > > > And we also have separate thread to monitor topic metadata > > change(leader > > > > broker change, new partition added, using internal API like ZkUtil > etc) > > > > and do dynamic partition based on that(for example auto-reconnect to > > new > > > > leader broker, create new partition to consume from new kafka > partition > > > at > > > > runtime). You can see High-level consumer doesn't work(It can only > > > balance > > > > between existing consumers unless you manually add new one) I'm > > thinking > > > > if the new consumer could be used to save some work we did before. > > > > > > > > I'm still confused with assign() and subscribe(). My understanding > is > > if > > > > you use assign() only, the consumer becomes more like a simple > consumer > > > > except if the leader broker changes it automatically reconnect to the > > new > > > > leader broker, is it correct? If you use subscribe() method only > then > > > all > > > > the partitions will be distributed to running consumer process with > > same > > > " > > > > group.id" using "partition.assignment.strategy". Is it true? > > > > > > > > So I assume assign() and subscribe()(and group.id > > > > partition.assignment.strategy settings) can not be used together? > > > > > > > > Also in the old API we found one thread per broker is the most > > efficient > > > > way to consume data, for example, if one process consumes from p1, > p2, > > p3 > > > > and p1,p2 are sitting on one broker b1, p3 is sitting on another one > > b2, > > > > the best thing is create 2 threads each thread use simple consumer > API > > > and > > > > only consume from one broker. I'm thinking how do I use the new API > to > > > do > > > > this. > > > > > > > > Thanks, > > > > Siyuan > > > > > > > > On Mon, Nov 16, 2015 at 4:43 PM, Guozhang Wang <wangg...@gmail.com> > > > wrote: > > > > > > > > > Hi Siyuan, > > > > > > > > > > 1) new consumer is single-threaded, it does not maintain any > internal > > > > > threads as the old high-level consumer. > > > > > > > > > > 2) each consumer will only maintain one TCP connection with each > > > broker. > > > > > The only extra socket is the one with its coordinator. That is, if > > > there > > > > is > > > > > three brokers S1, S2, S3, and S1 is the coordinator for this > > consumer, > > > it > > > > > will maintain 4 sockets in total, 2 for S1 (one for fetching, one > for > > > > > coordinating) and 1 for S2 and S3 (only for fetching). > > > > > > > > > > 3) Currently the connection is not closed by consumer, although the > > > > > underlying network client / selector will close idle ones after > some > > > > > timeout. So in worst case it will only maintain N+1 sockets in > total > > > for > > > > N > > > > > Kafka brokers at one time. > > > > > > > > > > Guozhang > > > > > > > > > > On Mon, Nov 16, 2015 at 4:22 PM, hsy...@gmail.com < > hsy...@gmail.com> > > > > > wrote: > > > > > > > > > > > The new consumer API looks good. If I understand it correctly you > > can > > > > use > > > > > > it like simple consumer or high-level consumer. But I have couple > > > > > questions > > > > > > about it's internal implementation > > > > > > > > > > > > First of all does the consumer have any internal fetcher threads > > like > > > > > > high-level consumer? > > > > > > > > > > > > When you assign multiple TopicPartitions to a consumer, how many > > TCP > > > > > > connections it establish to the brokers. Is it same as number of > > > leader > > > > > > brokers that host those partitions or just number of > > TopicPartitions. > > > > If > > > > > > there is any leader broker change does it establish new > > > > connections/using > > > > > > existing connections to fetch the data? Can it continue > consuming? > > > Also > > > > > is > > > > > > the connection kept until the consumer is closed? > > > > > > > > > > > > Thanks! > > > > > > > > > > > > Best, > > > > > > Siyuan > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > -- Guozhang > > > > > > > > > > > > > > >