Fetcher thread is per broker basis, it ensures that at lease one fetcher thread per broker. Fetcher thread is sent to broker with a fetch request to ask for all partitions. So if A, B, C are in the same broker fetcher thread is still able to fetch data from A, B, C even though A returns no data. same logic is applied to different broker.
On Thu, Mar 12, 2015 at 6:25 AM, James Cheng <jch...@tivo.com> wrote: > > On Mar 11, 2015, at 9:12 AM, Guozhang Wang <wangg...@gmail.com> wrote: > > > Hi James, > > > > What I meant before is that a single fetcher may be responsible for > putting > > fetched data to multiple queues according to the construction of the > > streams setup, where each queue may be consumed by a different thread. > And > > the queues are actually bounded. Now say if there are two queues that are > > getting data from the same fetcher F, and are consumed by two different > > user threads A and B. If thread A for some reason got slowed / hung > > consuming data from queue 1, then queue 1 will eventually get full, and F > > trying to put more data to it will be blocked. Since F is parked on > trying > > to put data to queue 1, queue 2 will not get more data from it, and > thread > > B may hence gets starved. Does that make sense now? > > > > Yes, that makes sense. That is the scenario where one thread of a consumer > can cause a backup in the queue, which would cause other threads to not > receive data. > > What about the situation I described, where a thread consumes a queue that > is supposed to be filled with messages from multiple partitions? If > partition A has no messages and partitions B and C do, how will the fetcher > behave? Will the processing thread receive messages from partitions B and C? > > Thanks, > -James > > > > Guozhang > > > > On Tue, Mar 10, 2015 at 5:15 PM, James Cheng <jch...@tivo.com> wrote: > > > >> Hi, > >> > >> Sorry to bring up this old thread, but my question is about this exact > >> thing: > >> > >> Guozhang, you said: > >>> A more concrete example: say you have topic AC: 3 partitions, topic > BC: 6 > >>> partitions. > >>> > >>> With createMessageStreams("AC" => 3, "BC" => 2) a total of 5 threads > will > >>> be created, and consuming AC-1,AC-2,AC-3,BC-1/2/3,BC-4/5/6 > respectively; > >>> > >>> With createMessageStreamsByFilter("*C" => 3) a total of 3 threads will > be > >>> created, and consuming AC-1/BC-1/BC-2, AC-2/BC-3/BC-4, AC-3/BC-5/BC-6 > >>> respectively. > >> > >> > >> You said that in the createMessageStreamsByFilter case, if topic AC had > no > >> messages in it and consumer.timeout.ms = -1, then the 3 threads might > all > >> be blocked waiting for data to arrive from topic AC, and so messages > from > >> BC would not be processed. > >> > >> createMessageStreamsByFilter("*C" => 1) (single stream) would have the > >> same problem but just worse. Behind the scenes, is there a single thread > >> that is consuming (round-robin?) messages from the different partitions > and > >> inserting them all into a single queue for the application code to > process? > >> And that is why a single partition with no messages with block the other > >> messages from getting through? > >> > >> What about createMessageStreams("AC" => 1)? That creates a single stream > >> that contains messages from multiple partitions, which might be on > >> different brokers. Does that also suffer the same problem, where if one > >> partition has no messages, that the application would not receive > messages > >> from the other paritions? > >> > >> Thanks, > >> -James > >> > >> > >> On Feb 11, 2015, at 8:13 AM, Guozhang Wang <wangg...@gmail.com> wrote: > >> > >>> The new consumer will be released in 0.9, which is targeted for end of > >> this > >>> quarter. > >>> > >>> On Tue, Feb 10, 2015 at 7:11 PM, tao xiao <xiaotao...@gmail.com> > wrote: > >>> > >>>> Do you know when the new consumer API will be publicly available? > >>>> > >>>> On Wed, Feb 11, 2015 at 10:43 AM, Guozhang Wang <wangg...@gmail.com> > >>>> wrote: > >>>> > >>>>> Yes, it can get stuck. For example, AC and BC are processed by two > >>>>> different processes and AC processors gets stuck, hence AC messages > >> will > >>>>> fill up in the consumer's buffer and eventually prevents the fetcher > >>>> thread > >>>>> to put more data into it; the fetcher thread will be blocked on that > >> and > >>>>> not be able to fetch BC. > >>>>> > >>>>> This issue has been addressed in the new consumer client, which is > >>>>> single-threaded with non-blocking APIs. > >>>>> > >>>>> Guozhang > >>>>> > >>>>> On Tue, Feb 10, 2015 at 6:24 PM, tao xiao <xiaotao...@gmail.com> > >> wrote: > >>>>> > >>>>>> Thank you Guozhang for your detailed explanation. In your example > >>>>>> createMessageStreamsByFilter("*C" => 3) since threads are shared > >> among > >>>>>> topics there may be situation where all 3 threads threads get stuck > >>>> with > >>>>>> topic AC e.g. topic is empty which will be holding the connecting > >>>> threads > >>>>>> (setting consumer.timeout.ms=-1) hence there is no thread to serve > >>>> topic > >>>>>> BC. do you think this situation will happen? > >>>>>> > >>>>>> On Wed, Feb 11, 2015 at 2:15 AM, Guozhang Wang <wangg...@gmail.com> > >>>>> wrote: > >>>>>> > >>>>>>> I was not clear before .. for createMessageStreamsByFilter each > >>>> matched > >>>>>>> topic will have num-threads, but shared: i.e. there will be totally > >>>>>>> num-threads created, but each thread will be responsible for > fetching > >>>>> all > >>>>>>> matched topics. > >>>>>>> > >>>>>>> A more concrete example: say you have topic AC: 3 partitions, topic > >>>>> BC: 6 > >>>>>>> partitions. > >>>>>>> > >>>>>>> With createMessageStreams("AC" => 3, "BC" => 2) a total of 5 > threads > >>>>> will > >>>>>>> be created, and consuming AC-1,AC-2,AC-3,BC-1/2/3,BC-4/5/6 > >>>>> respectively; > >>>>>>> > >>>>>>> With createMessageStreamsByFilter("*C" => 3) a total of 3 threads > >>>> will > >>>>> be > >>>>>>> created, and consuming AC-1/BC-1/BC-2, AC-2/BC-3/BC-4, > AC-3/BC-5/BC-6 > >>>>>>> respectively. > >>>>>>> > >>>>>>> Guozhang > >>>>>>> > >>>>>>> On Tue, Feb 10, 2015 at 8:37 AM, tao xiao <xiaotao...@gmail.com> > >>>>> wrote: > >>>>>>> > >>>>>>>> Guozhang, > >>>>>>>> > >>>>>>>> Do you mean that each regex matched topic owns number of threads > >>>> that > >>>>>> get > >>>>>>>> passed in to createMessageStreamsByFilter ? For example in below > >>>> code > >>>>>> If > >>>>>>> I > >>>>>>>> have 3 matched topics each of which has 2 partitions then I should > >>>>> have > >>>>>>> 3 * > >>>>>>>> 2 = 6 threads in total with each topic owning 2 threads. > >>>>>>>> > >>>>>>>> TopicFilter filter = new Whitelist(".*"); > >>>>>>>> > >>>>>>>> int threadTotal = 2; > >>>>>>>> > >>>>>>>> List<KafkaStream<byte[], byte[]>> streams = connector > >>>>>>>> .createMessageStreamsByFilter(filter, threadTotal); > >>>>>>>> > >>>>>>>> > >>>>>>>> But what I observed from the log is different > >>>>>>>> > >>>>>>>> 2015-02-11 00:24:04 INFO kafka.utils.Logging$class:68 - Consumer > >>>>>>>> test1234dd5_localhost-1423585444070-82f23758 rebalancing the > >>>>> following > >>>>>>>> partitions: ArrayBuffer(0, 1) for topic kafkatopic-5 with > >>>> consumers: > >>>>>>>> List(test1234dd5_localhost-1423585444070-82f23758-0, > >>>>>>>> test1234dd5_localhost-1423585444070-82f23758-1) > >>>>>>>> > >>>>>>>> 2015-02-11 00:24:04 INFO kafka.utils.Logging$class:68 - > >>>>>>>> test1234dd5_localhost-1423585444070-82f23758-1 attempting to claim > >>>>>>>> partition 1 > >>>>>>>> > >>>>>>>> 2015-02-11 00:24:04 INFO kafka.utils.Logging$class:68 - > >>>>>>>> test1234dd5_localhost-1423585444070-82f23758-0 attempting to claim > >>>>>>>> partition 0 > >>>>>>>> > >>>>>>>> 2015-02-11 00:24:04 INFO kafka.utils.Logging$class:68 - Consumer > >>>>>>>> test1234dd5_localhost-1423585444070-82f23758 rebalancing the > >>>>> following > >>>>>>>> partitions: ArrayBuffer(0) for topic kafkatopic-1 with consumers: > >>>>>>>> List(test1234dd5_localhost-1423585444070-82f23758-0, > >>>>>>>> test1234dd5_localhost-1423585444070-82f23758-1) > >>>>>>>> > >>>>>>>> 2015-02-11 00:24:04 WARN kafka.utils.Logging$class:83 - No broker > >>>>>>>> partitions consumed by consumer thread > >>>>>>>> test1234dd5_localhost-1423585444070-82f23758-1 for topic > >>>> kafkatopic-1 > >>>>>>>> > >>>>>>>> 2015-02-11 00:24:04 INFO kafka.utils.Logging$class:68 - > >>>>>>>> test1234dd5_localhost-1423585444070-82f23758-0 attempting to claim > >>>>>>>> partition 0 > >>>>>>>> > >>>>>>>> 2015-02-11 00:24:04 INFO kafka.utils.Logging$class:68 - Consumer > >>>>>>>> test1234dd5_localhost-1423585444070-82f23758 rebalancing the > >>>>> following > >>>>>>>> partitions: ArrayBuffer(0, 1) for topic kafkatopic-4 with > >>>> consumers: > >>>>>>>> List(test1234dd5_localhost-1423585444070-82f23758-0, > >>>>>>>> test1234dd5_localhost-1423585444070-82f23758-1) > >>>>>>>> > >>>>>>>> 2015-02-11 00:24:04 INFO kafka.utils.Logging$class:68 - > >>>>>>>> test1234dd5_localhost-1423585444070-82f23758-1 attempting to claim > >>>>>>>> partition 1 > >>>>>>>> > >>>>>>>> 2015-02-11 00:24:04 INFO kafka.utils.Logging$class:68 - > >>>>>>>> test1234dd5_localhost-1423585444070-82f23758-0 attempting to claim > >>>>>>>> partition 0 > >>>>>>>> > >>>>>>>> > >>>>>>>> As you can see from the log there are only 2 threads created and > >>>>> shared > >>>>>>>> among 3 topics. With this setting I think the parallelism is > >>>> degraded > >>>>>>> and a > >>>>>>>> slow topic may impact other topics' consumption performance. Any > >>>>>>> thoughts? > >>>>>>>> > >>>>>>>> On Wed, Feb 11, 2015 at 12:00 AM, Guozhang Wang < > >>>> wangg...@gmail.com> > >>>>>>>> wrote: > >>>>>>>> > >>>>>>>>> createMessageStreams is used for consuming from specific > >>>> topic(s), > >>>>>>> where > >>>>>>>>> you can put a map of [topic-name, num-threads] as its input > >>>>>> parameters; > >>>>>>>>> > >>>>>>>>> createMessageStreamsByFilter is used for consuming from wildcard > >>>>>>> topics, > >>>>>>>>> where you can put a (regex, num-threads) as its input parameters, > >>>>> and > >>>>>>> for > >>>>>>>>> each regex matched topic num-threads will be created. > >>>>>>>>> > >>>>>>>>> The difference between these two are not really for throughput / > >>>>>>> latency, > >>>>>>>>> but rather consumption semantics. > >>>>>>>>> > >>>>>>>>> Guozhang > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> On Tue, Feb 10, 2015 at 3:02 AM, tao xiao <xiaotao...@gmail.com> > >>>>>>> wrote: > >>>>>>>>> > >>>>>>>>>> Hi team, > >>>>>>>>>> > >>>>>>>>>> I am comparing the differences between > >>>>>>>>>> ConsumerConnector.createMessageStreams > >>>>>>>>>> and ConsumerConnector.createMessageStreamsByFilter. My > >>>>>> understanding > >>>>>>> is > >>>>>>>>>> that createMessageStreams creates x number of threads (x is the > >>>>>>> number > >>>>>>>> of > >>>>>>>>>> threads passed in to the method) dedicated to the specified > >>>> topic > >>>>>>>>>> while createMessageStreamsByFilter creates x number of threads > >>>>>> shared > >>>>>>>> by > >>>>>>>>>> topics specified by TopicFilter. Is it correct? > >>>>>>>>>> > >>>>>>>>>> If this is the case I assume createMessageStreams is the > >>>>> preferred > >>>>>>> way > >>>>>>>> to > >>>>>>>>>> create streams for each topic if I have high throughput and low > >>>>>>> latency > >>>>>>>>>> demands. is my assumption correct? > >>>>>>>>>> > >>>>>>>>>> -- > >>>>>>>>>> Regards, > >>>>>>>>>> Tao > >>>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> -- > >>>>>>>>> -- Guozhang > >>>>>>>>> > >>>>>>>> > >>>>>>>> > >>>>>>>> > >>>>>>>> -- > >>>>>>>> Regards, > >>>>>>>> Tao > >>>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> -- > >>>>>>> -- Guozhang > >>>>>>> > >>>>>> > >>>>>> > >>>>>> > >>>>>> -- > >>>>>> Regards, > >>>>>> Tao > >>>>>> > >>>>> > >>>>> > >>>>> > >>>>> -- > >>>>> -- Guozhang > >>>>> > >>>> > >>>> > >>>> > >>>> -- > >>>> Regards, > >>>> Tao > >>>> > >>> > >>> > >>> > >>> -- > >>> -- Guozhang > >> > >> > > > > > > -- > > -- Guozhang > > -- Regards, Tao