Thanks Guozhang,

Maybe I should give a few words about what I'm going to achieve with new API

Currently, I'm building a new kafka connector for Apache Apex(
http://apex.incubator.apache.org/) using 0.9.0 API
Apex support dynamic partition, so in the old version, We manage all the
consumer partitions in either 1:1 strategy (each consumer process consumes
only from one kafka partition) or 1:n strategy (each consumer process could
consume from multiple kafka partitions, using round-robin to distribute)
And we also have separate thread to monitor topic metadata change(leader
broker change, new partition added, using internal API like ZkUtil etc)
and do dynamic partition based on that(for example auto-reconnect to new
leader broker, create new partition to consume from new kafka partition at
runtime).  You can see High-level consumer doesn't work(It can only balance
between existing consumers unless you manually add new one)  I'm thinking
if the new consumer could be used to save some work we did before.

I'm still confused with assign() and subscribe().  My understanding is if
you use assign() only, the consumer becomes more like a simple consumer
except if the leader broker changes it automatically reconnect to the new
leader broker, is it correct?   If you use subscribe() method only then all
the partitions will be distributed to running consumer process with same "
group.id" using "partition.assignment.strategy". Is it true?

So I assume assign() and subscribe()(and group.id
partition.assignment.strategy settings) can not be used together?

Also in the old API we found one thread per broker is the most efficient
way to consume data, for example, if one process consumes from p1, p2, p3
and p1,p2 are sitting on one broker b1, p3 is sitting on another one b2,
the best thing is create 2 threads each thread use simple consumer API and
only consume from one broker.  I'm thinking how do I use the new API to do
this.

Thanks,
Siyuan

On Mon, Nov 16, 2015 at 4:43 PM, Guozhang Wang <wangg...@gmail.com> wrote:

> Hi Siyuan,
>
> 1) new consumer is single-threaded, it does not maintain any internal
> threads as the old high-level consumer.
>
> 2) each consumer will only maintain one TCP connection with each broker.
> The only extra socket is the one with its coordinator. That is, if there is
> three brokers S1, S2, S3, and S1 is the coordinator for this consumer, it
> will maintain 4 sockets in total, 2 for S1 (one for fetching, one for
> coordinating) and 1 for S2 and S3 (only for fetching).
>
> 3) Currently the connection is not closed by consumer, although the
> underlying network client / selector will close idle ones after some
> timeout. So in worst case it will only maintain N+1 sockets in total for N
> Kafka brokers at one time.
>
> Guozhang
>
> On Mon, Nov 16, 2015 at 4:22 PM, hsy...@gmail.com <hsy...@gmail.com>
> wrote:
>
> > The new consumer API looks good. If I understand it correctly you can use
> > it like simple consumer or high-level consumer. But I have couple
> questions
> > about it's internal implementation
> >
> > First of all does the consumer have any internal fetcher threads like
> > high-level consumer?
> >
> > When you assign multiple TopicPartitions to a consumer, how many TCP
> > connections it establish to the brokers. Is it same as number of leader
> > brokers that host those partitions or just number of TopicPartitions. If
> > there is any leader broker change does it establish new connections/using
> > existing connections to fetch the data? Can it continue consuming? Also
> is
> > the connection kept until the consumer is closed?
> >
> > Thanks!
> >
> > Best,
> > Siyuan
> >
>
>
>
> --
> -- Guozhang
>

Reply via email to