Thank you Guozhang for your detailed explanation. In your example
createMessageStreamsByFilter("*C" => 3)  since threads are shared among
topics there may be situation where all 3 threads threads get stuck with
topic AC e.g. topic is empty which will be holding the connecting threads
(setting consumer.timeout.ms=-1) hence there is no thread to serve topic
BC. do you think this situation will happen?

On Wed, Feb 11, 2015 at 2:15 AM, Guozhang Wang <wangg...@gmail.com> wrote:

> I was not clear before .. for createMessageStreamsByFilter each matched
> topic will have num-threads, but shared: i.e. there will be totally
> num-threads created, but each thread will be responsible for fetching all
> matched topics.
>
> A more concrete example: say you have topic AC: 3 partitions, topic BC: 6
> partitions.
>
> With createMessageStreams("AC" => 3, "BC" => 2) a total of 5 threads will
> be created, and consuming AC-1,AC-2,AC-3,BC-1/2/3,BC-4/5/6 respectively;
>
> With createMessageStreamsByFilter("*C" => 3) a total of 3 threads will be
> created, and consuming AC-1/BC-1/BC-2, AC-2/BC-3/BC-4, AC-3/BC-5/BC-6
> respectively.
>
> Guozhang
>
> On Tue, Feb 10, 2015 at 8:37 AM, tao xiao <xiaotao...@gmail.com> wrote:
>
> > Guozhang,
> >
> > Do you mean that each regex matched topic owns number of threads that get
> > passed in to createMessageStreamsByFilter ? For example in below code If
> I
> > have 3 matched topics each of which has 2 partitions then I should have
> 3 *
> > 2 = 6 threads in total with each topic owning 2 threads.
> >
> > TopicFilter filter = new Whitelist(".*");
> >
> >  int threadTotal = 2;
> >
> >  List<KafkaStream<byte[], byte[]>> streams = connector
> > .createMessageStreamsByFilter(filter, threadTotal);
> >
> >
> > But what I observed from the log is different
> >
> > 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 - Consumer
> > test1234dd5_localhost-1423585444070-82f23758 rebalancing the following
> > partitions: ArrayBuffer(0, 1) for topic kafkatopic-5 with consumers:
> > List(test1234dd5_localhost-1423585444070-82f23758-0,
> > test1234dd5_localhost-1423585444070-82f23758-1)
> >
> > 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 -
> > test1234dd5_localhost-1423585444070-82f23758-1 attempting to claim
> > partition 1
> >
> > 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 -
> > test1234dd5_localhost-1423585444070-82f23758-0 attempting to claim
> > partition 0
> >
> > 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 - Consumer
> > test1234dd5_localhost-1423585444070-82f23758 rebalancing the following
> > partitions: ArrayBuffer(0) for topic kafkatopic-1 with consumers:
> > List(test1234dd5_localhost-1423585444070-82f23758-0,
> > test1234dd5_localhost-1423585444070-82f23758-1)
> >
> > 2015-02-11 00:24:04 WARN  kafka.utils.Logging$class:83 - No broker
> > partitions consumed by consumer thread
> > test1234dd5_localhost-1423585444070-82f23758-1 for topic kafkatopic-1
> >
> > 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 -
> > test1234dd5_localhost-1423585444070-82f23758-0 attempting to claim
> > partition 0
> >
> > 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 - Consumer
> > test1234dd5_localhost-1423585444070-82f23758 rebalancing the following
> > partitions: ArrayBuffer(0, 1) for topic kafkatopic-4 with consumers:
> > List(test1234dd5_localhost-1423585444070-82f23758-0,
> > test1234dd5_localhost-1423585444070-82f23758-1)
> >
> > 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 -
> > test1234dd5_localhost-1423585444070-82f23758-1 attempting to claim
> > partition 1
> >
> > 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 -
> > test1234dd5_localhost-1423585444070-82f23758-0 attempting to claim
> > partition 0
> >
> >
> > As you can see from the log there are only 2 threads created and shared
> > among 3 topics. With this setting I think the parallelism is degraded
> and a
> > slow topic may impact other topics' consumption performance. Any
> thoughts?
> >
> > On Wed, Feb 11, 2015 at 12:00 AM, Guozhang Wang <wangg...@gmail.com>
> > wrote:
> >
> > > createMessageStreams is used for consuming from specific topic(s),
> where
> > > you can put a map of [topic-name, num-threads] as its input parameters;
> > >
> > > createMessageStreamsByFilter is used for consuming from wildcard
> topics,
> > > where you can put a (regex, num-threads) as its input parameters, and
> for
> > > each regex matched topic num-threads will be created.
> > >
> > > The difference between these two are not really for throughput /
> latency,
> > > but rather consumption semantics.
> > >
> > > Guozhang
> > >
> > >
> > > On Tue, Feb 10, 2015 at 3:02 AM, tao xiao <xiaotao...@gmail.com>
> wrote:
> > >
> > > > Hi team,
> > > >
> > > > I am comparing the differences between
> > > > ConsumerConnector.createMessageStreams
> > > > and ConsumerConnector.createMessageStreamsByFilter. My understanding
> is
> > > > that createMessageStreams creates x number of threads (x is the
> number
> > of
> > > > threads passed in to the method) dedicated to the specified topic
> > > > while createMessageStreamsByFilter creates x number of threads shared
> > by
> > > > topics specified by TopicFilter. Is it correct?
> > > >
> > > > If this is the case I assume createMessageStreams is the preferred
> way
> > to
> > > > create streams for each topic if I have high throughput and low
> latency
> > > > demands. is my assumption correct?
> > > >
> > > > --
> > > > Regards,
> > > > Tao
> > > >
> > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
> >
> >
> > --
> > Regards,
> > Tao
> >
>
>
>
> --
> -- Guozhang
>



-- 
Regards,
Tao

Reply via email to