That sounds like a good suggestion. I'm actually looking at the code and I
will start another thread for questions about that.

On Tue, Nov 17, 2015 at 5:42 PM, Jason Gustafson <ja...@confluent.io> wrote:

> Thanks for the explanation. Certainly you'd use less connections with this
> approach, but it might be worthwhile to do some performance analysis to see
> whether there is much difference in throughput (I'd be interested in seeing
> these results myself). Another approach that might be interesting would be
> to implement your own partition assignor which took into account the
> leaders of each partition. Then you could just use subscribe() and let
> Kafka manage the group for you. This is similar to how we were thinking of
> implementing consumer rack-awareness.
>
> -Jason
>
> On Tue, Nov 17, 2015 at 4:04 PM, hsy...@gmail.com <hsy...@gmail.com>
> wrote:
>
> > By efficiency, I mean maximize throughput while minimize resources on
> both
> > broker sides and consumer sides.
> >
> > One example is if you have over 200 partitions on 10 brokers and you can
> > start 5 consumer processes to consume data, if each one is single-thread
> > and you do round-robin to distribute the load then each one will try to
> > fetch from over 40 partitions one by one through 10 connections
> > possibly(overall is 50),  but if it's smart enough to group partitions by
> > brokers, each process can have 2 separate threads(consuming from 2
> > different brokers concurrently). That seems a more optimal solution than
> > another, right?
> >
> > On Tue, Nov 17, 2015 at 2:54 PM, Jason Gustafson <ja...@confluent.io>
> > wrote:
> >
> > > Hi Siyuan,
> > >
> > > Your understanding about assign/subscribe is correct. We think of topic
> > > subscription as enabling automatic assignment as opposed to doing
> manual
> > > assignment through assign(). We don't currently them to be mixed.
> > >
> > > Can you elaborate on your findings with respect to using one thread per
> > > broker? In what sense was it more efficient? Doing the same thing might
> > be
> > > tricky with the new consumer, but I think you could do it using
> > > partitionsFor() to find the current partition leaders and assign() to
> set
> > > the assignment in each thread.
> > >
> > > -Jason
> > >
> > > On Tue, Nov 17, 2015 at 10:25 AM, hsy...@gmail.com <hsy...@gmail.com>
> > > wrote:
> > >
> > > > Thanks Guozhang,
> > > >
> > > > Maybe I should give a few words about what I'm going to achieve with
> > new
> > > > API
> > > >
> > > > Currently, I'm building a new kafka connector for Apache Apex(
> > > > http://apex.incubator.apache.org/) using 0.9.0 API
> > > > Apex support dynamic partition, so in the old version, We manage all
> > the
> > > > consumer partitions in either 1:1 strategy (each consumer process
> > > consumes
> > > > only from one kafka partition) or 1:n strategy (each consumer process
> > > could
> > > > consume from multiple kafka partitions, using round-robin to
> > distribute)
> > > > And we also have separate thread to monitor topic metadata
> > change(leader
> > > > broker change, new partition added, using internal API like ZkUtil
> etc)
> > > > and do dynamic partition based on that(for example auto-reconnect to
> > new
> > > > leader broker, create new partition to consume from new kafka
> partition
> > > at
> > > > runtime).  You can see High-level consumer doesn't work(It can only
> > > balance
> > > > between existing consumers unless you manually add new one)  I'm
> > thinking
> > > > if the new consumer could be used to save some work we did before.
> > > >
> > > > I'm still confused with assign() and subscribe().  My understanding
> is
> > if
> > > > you use assign() only, the consumer becomes more like a simple
> consumer
> > > > except if the leader broker changes it automatically reconnect to the
> > new
> > > > leader broker, is it correct?   If you use subscribe() method only
> then
> > > all
> > > > the partitions will be distributed to running consumer process with
> > same
> > > "
> > > > group.id" using "partition.assignment.strategy". Is it true?
> > > >
> > > > So I assume assign() and subscribe()(and group.id
> > > > partition.assignment.strategy settings) can not be used together?
> > > >
> > > > Also in the old API we found one thread per broker is the most
> > efficient
> > > > way to consume data, for example, if one process consumes from p1,
> p2,
> > p3
> > > > and p1,p2 are sitting on one broker b1, p3 is sitting on another one
> > b2,
> > > > the best thing is create 2 threads each thread use simple consumer
> API
> > > and
> > > > only consume from one broker.  I'm thinking how do I use the new API
> to
> > > do
> > > > this.
> > > >
> > > > Thanks,
> > > > Siyuan
> > > >
> > > > On Mon, Nov 16, 2015 at 4:43 PM, Guozhang Wang <wangg...@gmail.com>
> > > wrote:
> > > >
> > > > > Hi Siyuan,
> > > > >
> > > > > 1) new consumer is single-threaded, it does not maintain any
> internal
> > > > > threads as the old high-level consumer.
> > > > >
> > > > > 2) each consumer will only maintain one TCP connection with each
> > > broker.
> > > > > The only extra socket is the one with its coordinator. That is, if
> > > there
> > > > is
> > > > > three brokers S1, S2, S3, and S1 is the coordinator for this
> > consumer,
> > > it
> > > > > will maintain 4 sockets in total, 2 for S1 (one for fetching, one
> for
> > > > > coordinating) and 1 for S2 and S3 (only for fetching).
> > > > >
> > > > > 3) Currently the connection is not closed by consumer, although the
> > > > > underlying network client / selector will close idle ones after
> some
> > > > > timeout. So in worst case it will only maintain N+1 sockets in
> total
> > > for
> > > > N
> > > > > Kafka brokers at one time.
> > > > >
> > > > > Guozhang
> > > > >
> > > > > On Mon, Nov 16, 2015 at 4:22 PM, hsy...@gmail.com <
> hsy...@gmail.com>
> > > > > wrote:
> > > > >
> > > > > > The new consumer API looks good. If I understand it correctly you
> > can
> > > > use
> > > > > > it like simple consumer or high-level consumer. But I have couple
> > > > > questions
> > > > > > about it's internal implementation
> > > > > >
> > > > > > First of all does the consumer have any internal fetcher threads
> > like
> > > > > > high-level consumer?
> > > > > >
> > > > > > When you assign multiple TopicPartitions to a consumer, how many
> > TCP
> > > > > > connections it establish to the brokers. Is it same as number of
> > > leader
> > > > > > brokers that host those partitions or just number of
> > TopicPartitions.
> > > > If
> > > > > > there is any leader broker change does it establish new
> > > > connections/using
> > > > > > existing connections to fetch the data? Can it continue
> consuming?
> > > Also
> > > > > is
> > > > > > the connection kept until the consumer is closed?
> > > > > >
> > > > > > Thanks!
> > > > > >
> > > > > > Best,
> > > > > > Siyuan
> > > > > >
> > > > >
> > > > >
> > > > >
> > > > > --
> > > > > -- Guozhang
> > > > >
> > > >
> > >
> >
>

Reply via email to