Thank you Guozhang for your detailed explanation. In your example createMessageStreamsByFilter("*C" => 3) since threads are shared among topics there may be situation where all 3 threads threads get stuck with topic AC e.g. topic is empty which will be holding the connecting threads (setting consumer.timeout.ms=-1) hence there is no thread to serve topic BC. do you think this situation will happen?
On Wed, Feb 11, 2015 at 2:15 AM, Guozhang Wang <wangg...@gmail.com> wrote: > I was not clear before .. for createMessageStreamsByFilter each matched > topic will have num-threads, but shared: i.e. there will be totally > num-threads created, but each thread will be responsible for fetching all > matched topics. > > A more concrete example: say you have topic AC: 3 partitions, topic BC: 6 > partitions. > > With createMessageStreams("AC" => 3, "BC" => 2) a total of 5 threads will > be created, and consuming AC-1,AC-2,AC-3,BC-1/2/3,BC-4/5/6 respectively; > > With createMessageStreamsByFilter("*C" => 3) a total of 3 threads will be > created, and consuming AC-1/BC-1/BC-2, AC-2/BC-3/BC-4, AC-3/BC-5/BC-6 > respectively. > > Guozhang > > On Tue, Feb 10, 2015 at 8:37 AM, tao xiao <xiaotao...@gmail.com> wrote: > > > Guozhang, > > > > Do you mean that each regex matched topic owns number of threads that get > > passed in to createMessageStreamsByFilter ? For example in below code If > I > > have 3 matched topics each of which has 2 partitions then I should have > 3 * > > 2 = 6 threads in total with each topic owning 2 threads. > > > > TopicFilter filter = new Whitelist(".*"); > > > > int threadTotal = 2; > > > > List<KafkaStream<byte[], byte[]>> streams = connector > > .createMessageStreamsByFilter(filter, threadTotal); > > > > > > But what I observed from the log is different > > > > 2015-02-11 00:24:04 INFO kafka.utils.Logging$class:68 - Consumer > > test1234dd5_localhost-1423585444070-82f23758 rebalancing the following > > partitions: ArrayBuffer(0, 1) for topic kafkatopic-5 with consumers: > > List(test1234dd5_localhost-1423585444070-82f23758-0, > > test1234dd5_localhost-1423585444070-82f23758-1) > > > > 2015-02-11 00:24:04 INFO kafka.utils.Logging$class:68 - > > test1234dd5_localhost-1423585444070-82f23758-1 attempting to claim > > partition 1 > > > > 2015-02-11 00:24:04 INFO kafka.utils.Logging$class:68 - > > test1234dd5_localhost-1423585444070-82f23758-0 attempting to claim > > partition 0 > > > > 2015-02-11 00:24:04 INFO kafka.utils.Logging$class:68 - Consumer > > test1234dd5_localhost-1423585444070-82f23758 rebalancing the following > > partitions: ArrayBuffer(0) for topic kafkatopic-1 with consumers: > > List(test1234dd5_localhost-1423585444070-82f23758-0, > > test1234dd5_localhost-1423585444070-82f23758-1) > > > > 2015-02-11 00:24:04 WARN kafka.utils.Logging$class:83 - No broker > > partitions consumed by consumer thread > > test1234dd5_localhost-1423585444070-82f23758-1 for topic kafkatopic-1 > > > > 2015-02-11 00:24:04 INFO kafka.utils.Logging$class:68 - > > test1234dd5_localhost-1423585444070-82f23758-0 attempting to claim > > partition 0 > > > > 2015-02-11 00:24:04 INFO kafka.utils.Logging$class:68 - Consumer > > test1234dd5_localhost-1423585444070-82f23758 rebalancing the following > > partitions: ArrayBuffer(0, 1) for topic kafkatopic-4 with consumers: > > List(test1234dd5_localhost-1423585444070-82f23758-0, > > test1234dd5_localhost-1423585444070-82f23758-1) > > > > 2015-02-11 00:24:04 INFO kafka.utils.Logging$class:68 - > > test1234dd5_localhost-1423585444070-82f23758-1 attempting to claim > > partition 1 > > > > 2015-02-11 00:24:04 INFO kafka.utils.Logging$class:68 - > > test1234dd5_localhost-1423585444070-82f23758-0 attempting to claim > > partition 0 > > > > > > As you can see from the log there are only 2 threads created and shared > > among 3 topics. With this setting I think the parallelism is degraded > and a > > slow topic may impact other topics' consumption performance. Any > thoughts? > > > > On Wed, Feb 11, 2015 at 12:00 AM, Guozhang Wang <wangg...@gmail.com> > > wrote: > > > > > createMessageStreams is used for consuming from specific topic(s), > where > > > you can put a map of [topic-name, num-threads] as its input parameters; > > > > > > createMessageStreamsByFilter is used for consuming from wildcard > topics, > > > where you can put a (regex, num-threads) as its input parameters, and > for > > > each regex matched topic num-threads will be created. > > > > > > The difference between these two are not really for throughput / > latency, > > > but rather consumption semantics. > > > > > > Guozhang > > > > > > > > > On Tue, Feb 10, 2015 at 3:02 AM, tao xiao <xiaotao...@gmail.com> > wrote: > > > > > > > Hi team, > > > > > > > > I am comparing the differences between > > > > ConsumerConnector.createMessageStreams > > > > and ConsumerConnector.createMessageStreamsByFilter. My understanding > is > > > > that createMessageStreams creates x number of threads (x is the > number > > of > > > > threads passed in to the method) dedicated to the specified topic > > > > while createMessageStreamsByFilter creates x number of threads shared > > by > > > > topics specified by TopicFilter. Is it correct? > > > > > > > > If this is the case I assume createMessageStreams is the preferred > way > > to > > > > create streams for each topic if I have high throughput and low > latency > > > > demands. is my assumption correct? > > > > > > > > -- > > > > Regards, > > > > Tao > > > > > > > > > > > > > > > > -- > > > -- Guozhang > > > > > > > > > > > -- > > Regards, > > Tao > > > > > > -- > -- Guozhang > -- Regards, Tao