Do you know when the new consumer API will be publicly available?

On Wed, Feb 11, 2015 at 10:43 AM, Guozhang Wang <wangg...@gmail.com> wrote:

> Yes, it can get stuck. For example, AC and BC are processed by two
> different processes and AC processors gets stuck, hence AC messages will
> fill up in the consumer's buffer and eventually prevents the fetcher thread
> to put more data into it; the fetcher thread will be blocked on that and
> not be able to fetch BC.
>
> This issue has been addressed in the new consumer client, which is
> single-threaded with non-blocking APIs.
>
> Guozhang
>
> On Tue, Feb 10, 2015 at 6:24 PM, tao xiao <xiaotao...@gmail.com> wrote:
>
> > Thank you Guozhang for your detailed explanation. In your example
> > createMessageStreamsByFilter("*C" => 3)  since threads are shared among
> > topics there may be situation where all 3 threads threads get stuck with
> > topic AC e.g. topic is empty which will be holding the connecting threads
> > (setting consumer.timeout.ms=-1) hence there is no thread to serve topic
> > BC. do you think this situation will happen?
> >
> > On Wed, Feb 11, 2015 at 2:15 AM, Guozhang Wang <wangg...@gmail.com>
> wrote:
> >
> > > I was not clear before .. for createMessageStreamsByFilter each matched
> > > topic will have num-threads, but shared: i.e. there will be totally
> > > num-threads created, but each thread will be responsible for fetching
> all
> > > matched topics.
> > >
> > > A more concrete example: say you have topic AC: 3 partitions, topic
> BC: 6
> > > partitions.
> > >
> > > With createMessageStreams("AC" => 3, "BC" => 2) a total of 5 threads
> will
> > > be created, and consuming AC-1,AC-2,AC-3,BC-1/2/3,BC-4/5/6
> respectively;
> > >
> > > With createMessageStreamsByFilter("*C" => 3) a total of 3 threads will
> be
> > > created, and consuming AC-1/BC-1/BC-2, AC-2/BC-3/BC-4, AC-3/BC-5/BC-6
> > > respectively.
> > >
> > > Guozhang
> > >
> > > On Tue, Feb 10, 2015 at 8:37 AM, tao xiao <xiaotao...@gmail.com>
> wrote:
> > >
> > > > Guozhang,
> > > >
> > > > Do you mean that each regex matched topic owns number of threads that
> > get
> > > > passed in to createMessageStreamsByFilter ? For example in below code
> > If
> > > I
> > > > have 3 matched topics each of which has 2 partitions then I should
> have
> > > 3 *
> > > > 2 = 6 threads in total with each topic owning 2 threads.
> > > >
> > > > TopicFilter filter = new Whitelist(".*");
> > > >
> > > >  int threadTotal = 2;
> > > >
> > > >  List<KafkaStream<byte[], byte[]>> streams = connector
> > > > .createMessageStreamsByFilter(filter, threadTotal);
> > > >
> > > >
> > > > But what I observed from the log is different
> > > >
> > > > 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 - Consumer
> > > > test1234dd5_localhost-1423585444070-82f23758 rebalancing the
> following
> > > > partitions: ArrayBuffer(0, 1) for topic kafkatopic-5 with consumers:
> > > > List(test1234dd5_localhost-1423585444070-82f23758-0,
> > > > test1234dd5_localhost-1423585444070-82f23758-1)
> > > >
> > > > 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 -
> > > > test1234dd5_localhost-1423585444070-82f23758-1 attempting to claim
> > > > partition 1
> > > >
> > > > 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 -
> > > > test1234dd5_localhost-1423585444070-82f23758-0 attempting to claim
> > > > partition 0
> > > >
> > > > 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 - Consumer
> > > > test1234dd5_localhost-1423585444070-82f23758 rebalancing the
> following
> > > > partitions: ArrayBuffer(0) for topic kafkatopic-1 with consumers:
> > > > List(test1234dd5_localhost-1423585444070-82f23758-0,
> > > > test1234dd5_localhost-1423585444070-82f23758-1)
> > > >
> > > > 2015-02-11 00:24:04 WARN  kafka.utils.Logging$class:83 - No broker
> > > > partitions consumed by consumer thread
> > > > test1234dd5_localhost-1423585444070-82f23758-1 for topic kafkatopic-1
> > > >
> > > > 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 -
> > > > test1234dd5_localhost-1423585444070-82f23758-0 attempting to claim
> > > > partition 0
> > > >
> > > > 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 - Consumer
> > > > test1234dd5_localhost-1423585444070-82f23758 rebalancing the
> following
> > > > partitions: ArrayBuffer(0, 1) for topic kafkatopic-4 with consumers:
> > > > List(test1234dd5_localhost-1423585444070-82f23758-0,
> > > > test1234dd5_localhost-1423585444070-82f23758-1)
> > > >
> > > > 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 -
> > > > test1234dd5_localhost-1423585444070-82f23758-1 attempting to claim
> > > > partition 1
> > > >
> > > > 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 -
> > > > test1234dd5_localhost-1423585444070-82f23758-0 attempting to claim
> > > > partition 0
> > > >
> > > >
> > > > As you can see from the log there are only 2 threads created and
> shared
> > > > among 3 topics. With this setting I think the parallelism is degraded
> > > and a
> > > > slow topic may impact other topics' consumption performance. Any
> > > thoughts?
> > > >
> > > > On Wed, Feb 11, 2015 at 12:00 AM, Guozhang Wang <wangg...@gmail.com>
> > > > wrote:
> > > >
> > > > > createMessageStreams is used for consuming from specific topic(s),
> > > where
> > > > > you can put a map of [topic-name, num-threads] as its input
> > parameters;
> > > > >
> > > > > createMessageStreamsByFilter is used for consuming from wildcard
> > > topics,
> > > > > where you can put a (regex, num-threads) as its input parameters,
> and
> > > for
> > > > > each regex matched topic num-threads will be created.
> > > > >
> > > > > The difference between these two are not really for throughput /
> > > latency,
> > > > > but rather consumption semantics.
> > > > >
> > > > > Guozhang
> > > > >
> > > > >
> > > > > On Tue, Feb 10, 2015 at 3:02 AM, tao xiao <xiaotao...@gmail.com>
> > > wrote:
> > > > >
> > > > > > Hi team,
> > > > > >
> > > > > > I am comparing the differences between
> > > > > > ConsumerConnector.createMessageStreams
> > > > > > and ConsumerConnector.createMessageStreamsByFilter. My
> > understanding
> > > is
> > > > > > that createMessageStreams creates x number of threads (x is the
> > > number
> > > > of
> > > > > > threads passed in to the method) dedicated to the specified topic
> > > > > > while createMessageStreamsByFilter creates x number of threads
> > shared
> > > > by
> > > > > > topics specified by TopicFilter. Is it correct?
> > > > > >
> > > > > > If this is the case I assume createMessageStreams is the
> preferred
> > > way
> > > > to
> > > > > > create streams for each topic if I have high throughput and low
> > > latency
> > > > > > demands. is my assumption correct?
> > > > > >
> > > > > > --
> > > > > > Regards,
> > > > > > Tao
> > > > > >
> > > > >
> > > > >
> > > > >
> > > > > --
> > > > > -- Guozhang
> > > > >
> > > >
> > > >
> > > >
> > > > --
> > > > Regards,
> > > > Tao
> > > >
> > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
> >
> >
> > --
> > Regards,
> > Tao
> >
>
>
>
> --
> -- Guozhang
>



-- 
Regards,
Tao

Reply via email to