Guozhang,

Do you mean that each regex matched topic owns number of threads that get
passed in to createMessageStreamsByFilter ? For example in below code If I
have 3 matched topics each of which has 2 partitions then I should have 3 *
2 = 6 threads in total with each topic owning 2 threads.

TopicFilter filter = new Whitelist(".*");

 int threadTotal = 2;

 List<KafkaStream<byte[], byte[]>> streams = connector
.createMessageStreamsByFilter(filter, threadTotal);


But what I observed from the log is different

2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 - Consumer
test1234dd5_localhost-1423585444070-82f23758 rebalancing the following
partitions: ArrayBuffer(0, 1) for topic kafkatopic-5 with consumers:
List(test1234dd5_localhost-1423585444070-82f23758-0,
test1234dd5_localhost-1423585444070-82f23758-1)

2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 -
test1234dd5_localhost-1423585444070-82f23758-1 attempting to claim
partition 1

2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 -
test1234dd5_localhost-1423585444070-82f23758-0 attempting to claim
partition 0

2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 - Consumer
test1234dd5_localhost-1423585444070-82f23758 rebalancing the following
partitions: ArrayBuffer(0) for topic kafkatopic-1 with consumers:
List(test1234dd5_localhost-1423585444070-82f23758-0,
test1234dd5_localhost-1423585444070-82f23758-1)

2015-02-11 00:24:04 WARN  kafka.utils.Logging$class:83 - No broker
partitions consumed by consumer thread
test1234dd5_localhost-1423585444070-82f23758-1 for topic kafkatopic-1

2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 -
test1234dd5_localhost-1423585444070-82f23758-0 attempting to claim
partition 0

2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 - Consumer
test1234dd5_localhost-1423585444070-82f23758 rebalancing the following
partitions: ArrayBuffer(0, 1) for topic kafkatopic-4 with consumers:
List(test1234dd5_localhost-1423585444070-82f23758-0,
test1234dd5_localhost-1423585444070-82f23758-1)

2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 -
test1234dd5_localhost-1423585444070-82f23758-1 attempting to claim
partition 1

2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 -
test1234dd5_localhost-1423585444070-82f23758-0 attempting to claim
partition 0


As you can see from the log there are only 2 threads created and shared
among 3 topics. With this setting I think the parallelism is degraded and a
slow topic may impact other topics' consumption performance. Any thoughts?

On Wed, Feb 11, 2015 at 12:00 AM, Guozhang Wang <wangg...@gmail.com> wrote:

> createMessageStreams is used for consuming from specific topic(s), where
> you can put a map of [topic-name, num-threads] as its input parameters;
>
> createMessageStreamsByFilter is used for consuming from wildcard topics,
> where you can put a (regex, num-threads) as its input parameters, and for
> each regex matched topic num-threads will be created.
>
> The difference between these two are not really for throughput / latency,
> but rather consumption semantics.
>
> Guozhang
>
>
> On Tue, Feb 10, 2015 at 3:02 AM, tao xiao <xiaotao...@gmail.com> wrote:
>
> > Hi team,
> >
> > I am comparing the differences between
> > ConsumerConnector.createMessageStreams
> > and ConsumerConnector.createMessageStreamsByFilter. My understanding is
> > that createMessageStreams creates x number of threads (x is the number of
> > threads passed in to the method) dedicated to the specified topic
> > while createMessageStreamsByFilter creates x number of threads shared by
> > topics specified by TopicFilter. Is it correct?
> >
> > If this is the case I assume createMessageStreams is the preferred way to
> > create streams for each topic if I have high throughput and low latency
> > demands. is my assumption correct?
> >
> > --
> > Regards,
> > Tao
> >
>
>
>
> --
> -- Guozhang
>



-- 
Regards,
Tao

Reply via email to