Guozhang, Do you mean that each regex matched topic owns number of threads that get passed in to createMessageStreamsByFilter ? For example in below code If I have 3 matched topics each of which has 2 partitions then I should have 3 * 2 = 6 threads in total with each topic owning 2 threads.
TopicFilter filter = new Whitelist(".*"); int threadTotal = 2; List<KafkaStream<byte[], byte[]>> streams = connector .createMessageStreamsByFilter(filter, threadTotal); But what I observed from the log is different 2015-02-11 00:24:04 INFO kafka.utils.Logging$class:68 - Consumer test1234dd5_localhost-1423585444070-82f23758 rebalancing the following partitions: ArrayBuffer(0, 1) for topic kafkatopic-5 with consumers: List(test1234dd5_localhost-1423585444070-82f23758-0, test1234dd5_localhost-1423585444070-82f23758-1) 2015-02-11 00:24:04 INFO kafka.utils.Logging$class:68 - test1234dd5_localhost-1423585444070-82f23758-1 attempting to claim partition 1 2015-02-11 00:24:04 INFO kafka.utils.Logging$class:68 - test1234dd5_localhost-1423585444070-82f23758-0 attempting to claim partition 0 2015-02-11 00:24:04 INFO kafka.utils.Logging$class:68 - Consumer test1234dd5_localhost-1423585444070-82f23758 rebalancing the following partitions: ArrayBuffer(0) for topic kafkatopic-1 with consumers: List(test1234dd5_localhost-1423585444070-82f23758-0, test1234dd5_localhost-1423585444070-82f23758-1) 2015-02-11 00:24:04 WARN kafka.utils.Logging$class:83 - No broker partitions consumed by consumer thread test1234dd5_localhost-1423585444070-82f23758-1 for topic kafkatopic-1 2015-02-11 00:24:04 INFO kafka.utils.Logging$class:68 - test1234dd5_localhost-1423585444070-82f23758-0 attempting to claim partition 0 2015-02-11 00:24:04 INFO kafka.utils.Logging$class:68 - Consumer test1234dd5_localhost-1423585444070-82f23758 rebalancing the following partitions: ArrayBuffer(0, 1) for topic kafkatopic-4 with consumers: List(test1234dd5_localhost-1423585444070-82f23758-0, test1234dd5_localhost-1423585444070-82f23758-1) 2015-02-11 00:24:04 INFO kafka.utils.Logging$class:68 - test1234dd5_localhost-1423585444070-82f23758-1 attempting to claim partition 1 2015-02-11 00:24:04 INFO kafka.utils.Logging$class:68 - test1234dd5_localhost-1423585444070-82f23758-0 attempting to claim partition 0 As you can see from the log there are only 2 threads created and shared among 3 topics. With this setting I think the parallelism is degraded and a slow topic may impact other topics' consumption performance. Any thoughts? On Wed, Feb 11, 2015 at 12:00 AM, Guozhang Wang <wangg...@gmail.com> wrote: > createMessageStreams is used for consuming from specific topic(s), where > you can put a map of [topic-name, num-threads] as its input parameters; > > createMessageStreamsByFilter is used for consuming from wildcard topics, > where you can put a (regex, num-threads) as its input parameters, and for > each regex matched topic num-threads will be created. > > The difference between these two are not really for throughput / latency, > but rather consumption semantics. > > Guozhang > > > On Tue, Feb 10, 2015 at 3:02 AM, tao xiao <xiaotao...@gmail.com> wrote: > > > Hi team, > > > > I am comparing the differences between > > ConsumerConnector.createMessageStreams > > and ConsumerConnector.createMessageStreamsByFilter. My understanding is > > that createMessageStreams creates x number of threads (x is the number of > > threads passed in to the method) dedicated to the specified topic > > while createMessageStreamsByFilter creates x number of threads shared by > > topics specified by TopicFilter. Is it correct? > > > > If this is the case I assume createMessageStreams is the preferred way to > > create streams for each topic if I have high throughput and low latency > > demands. is my assumption correct? > > > > -- > > Regards, > > Tao > > > > > > -- > -- Guozhang > -- Regards, Tao