Sorry, but still confused. Maximum number of threads (fetchers) to fetch from a Leader or maximum number of threads within a follower broker?
Thanks for clarifying, -Zakee > On Mar 12, 2015, at 11:11 PM, tao xiao <xiaotao...@gmail.com> wrote: > > The number of fetchers is configurable via num.replica.fetchers. The > description of num.replica.fetchers in Kafka documentation is not quite > accurate. num.replica.fetchers actually controls the max number of fetchers > per broker. In you case num.replica.fetchers=8 and 5 brokers the means no > more 8 fetchers created for each broker > > On Fri, Mar 13, 2015 at 1:21 PM, Zakee <kzak...@netzero.net> wrote: > >> Is this always the case that there is only one fetcher per broker, won’t >> setting num.replica.fetchers greater than number-of-brokers cause more >> fetchers per broker? >> Let’s I have 5 brokers, and num of replica fetchers is 8, will there be 2 >> fetcher threads pulling from each broker? >> >> Thanks >> Zakee >> >> >> >>> On Mar 12, 2015, at 11:15 AM, James Cheng <jch...@tivo.com> wrote: >>> >>> Ah, I understand now. I didn't realize that there was one fetcher thread >> per broker. >>> >>> Thanks Tao & Guozhang! >>> -James >>> >>> >>> On Mar 11, 2015, at 5:00 PM, tao xiao <xiaotao...@gmail.com <mailto: >> xiaotao...@gmail.com>> wrote: >>> >>>> Fetcher thread is per broker basis, it ensures that at lease one fetcher >>>> thread per broker. Fetcher thread is sent to broker with a fetch >> request to >>>> ask for all partitions. So if A, B, C are in the same broker fetcher >> thread >>>> is still able to fetch data from A, B, C even though A returns no data. >>>> same logic is applied to different broker. >>>> >>>> On Thu, Mar 12, 2015 at 6:25 AM, James Cheng <jch...@tivo.com> wrote: >>>> >>>>> >>>>> On Mar 11, 2015, at 9:12 AM, Guozhang Wang <wangg...@gmail.com> wrote: >>>>> >>>>>> Hi James, >>>>>> >>>>>> What I meant before is that a single fetcher may be responsible for >>>>> putting >>>>>> fetched data to multiple queues according to the construction of the >>>>>> streams setup, where each queue may be consumed by a different thread. >>>>> And >>>>>> the queues are actually bounded. Now say if there are two queues that >> are >>>>>> getting data from the same fetcher F, and are consumed by two >> different >>>>>> user threads A and B. If thread A for some reason got slowed / hung >>>>>> consuming data from queue 1, then queue 1 will eventually get full, >> and F >>>>>> trying to put more data to it will be blocked. Since F is parked on >>>>> trying >>>>>> to put data to queue 1, queue 2 will not get more data from it, and >>>>> thread >>>>>> B may hence gets starved. Does that make sense now? >>>>>> >>>>> >>>>> Yes, that makes sense. That is the scenario where one thread of a >> consumer >>>>> can cause a backup in the queue, which would cause other threads to not >>>>> receive data. >>>>> >>>>> What about the situation I described, where a thread consumes a queue >> that >>>>> is supposed to be filled with messages from multiple partitions? If >>>>> partition A has no messages and partitions B and C do, how will the >> fetcher >>>>> behave? Will the processing thread receive messages from partitions B >> and C? >>>>> >>>>> Thanks, >>>>> -James >>>>> >>>>> >>>>>> Guozhang >>>>>> >>>>>> On Tue, Mar 10, 2015 at 5:15 PM, James Cheng <jch...@tivo.com> wrote: >>>>>> >>>>>>> Hi, >>>>>>> >>>>>>> Sorry to bring up this old thread, but my question is about this >> exact >>>>>>> thing: >>>>>>> >>>>>>> Guozhang, you said: >>>>>>>> A more concrete example: say you have topic AC: 3 partitions, topic >>>>> BC: 6 >>>>>>>> partitions. >>>>>>>> >>>>>>>> With createMessageStreams("AC" => 3, "BC" => 2) a total of 5 threads >>>>> will >>>>>>>> be created, and consuming AC-1,AC-2,AC-3,BC-1/2/3,BC-4/5/6 >>>>> respectively; >>>>>>>> >>>>>>>> With createMessageStreamsByFilter("*C" => 3) a total of 3 threads >> will >>>>> be >>>>>>>> created, and consuming AC-1/BC-1/BC-2, AC-2/BC-3/BC-4, >> AC-3/BC-5/BC-6 >>>>>>>> respectively. >>>>>>> >>>>>>> >>>>>>> You said that in the createMessageStreamsByFilter case, if topic AC >> had >>>>> no >>>>>>> messages in it and consumer.timeout.ms = -1, then the 3 threads >> might >>>>> all >>>>>>> be blocked waiting for data to arrive from topic AC, and so messages >>>>> from >>>>>>> BC would not be processed. >>>>>>> >>>>>>> createMessageStreamsByFilter("*C" => 1) (single stream) would have >> the >>>>>>> same problem but just worse. Behind the scenes, is there a single >> thread >>>>>>> that is consuming (round-robin?) messages from the different >> partitions >>>>> and >>>>>>> inserting them all into a single queue for the application code to >>>>> process? >>>>>>> And that is why a single partition with no messages with block the >> other >>>>>>> messages from getting through? >>>>>>> >>>>>>> What about createMessageStreams("AC" => 1)? That creates a single >> stream >>>>>>> that contains messages from multiple partitions, which might be on >>>>>>> different brokers. Does that also suffer the same problem, where if >> one >>>>>>> partition has no messages, that the application would not receive >>>>> messages >>>>>>> from the other paritions? >>>>>>> >>>>>>> Thanks, >>>>>>> -James >>>>>>> >>>>>>> >>>>>>> On Feb 11, 2015, at 8:13 AM, Guozhang Wang <wangg...@gmail.com> >> wrote: >>>>>>> >>>>>>>> The new consumer will be released in 0.9, which is targeted for end >> of >>>>>>> this >>>>>>>> quarter. >>>>>>>> >>>>>>>> On Tue, Feb 10, 2015 at 7:11 PM, tao xiao <xiaotao...@gmail.com> >>>>> wrote: >>>>>>>> >>>>>>>>> Do you know when the new consumer API will be publicly available? >>>>>>>>> >>>>>>>>> On Wed, Feb 11, 2015 at 10:43 AM, Guozhang Wang < >> wangg...@gmail.com> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Yes, it can get stuck. For example, AC and BC are processed by two >>>>>>>>>> different processes and AC processors gets stuck, hence AC >> messages >>>>>>> will >>>>>>>>>> fill up in the consumer's buffer and eventually prevents the >> fetcher >>>>>>>>> thread >>>>>>>>>> to put more data into it; the fetcher thread will be blocked on >> that >>>>>>> and >>>>>>>>>> not be able to fetch BC. >>>>>>>>>> >>>>>>>>>> This issue has been addressed in the new consumer client, which is >>>>>>>>>> single-threaded with non-blocking APIs. >>>>>>>>>> >>>>>>>>>> Guozhang >>>>>>>>>> >>>>>>>>>> On Tue, Feb 10, 2015 at 6:24 PM, tao xiao <xiaotao...@gmail.com> >>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> Thank you Guozhang for your detailed explanation. In your example >>>>>>>>>>> createMessageStreamsByFilter("*C" => 3) since threads are shared >>>>>>> among >>>>>>>>>>> topics there may be situation where all 3 threads threads get >> stuck >>>>>>>>> with >>>>>>>>>>> topic AC e.g. topic is empty which will be holding the connecting >>>>>>>>> threads >>>>>>>>>>> (setting consumer.timeout.ms=-1) hence there is no thread to >> serve >>>>>>>>> topic >>>>>>>>>>> BC. do you think this situation will happen? >>>>>>>>>>> >>>>>>>>>>> On Wed, Feb 11, 2015 at 2:15 AM, Guozhang Wang < >> wangg...@gmail.com> >>>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>>> I was not clear before .. for createMessageStreamsByFilter each >>>>>>>>> matched >>>>>>>>>>>> topic will have num-threads, but shared: i.e. there will be >> totally >>>>>>>>>>>> num-threads created, but each thread will be responsible for >>>>> fetching >>>>>>>>>> all >>>>>>>>>>>> matched topics. >>>>>>>>>>>> >>>>>>>>>>>> A more concrete example: say you have topic AC: 3 partitions, >> topic >>>>>>>>>> BC: 6 >>>>>>>>>>>> partitions. >>>>>>>>>>>> >>>>>>>>>>>> With createMessageStreams("AC" => 3, "BC" => 2) a total of 5 >>>>> threads >>>>>>>>>> will >>>>>>>>>>>> be created, and consuming AC-1,AC-2,AC-3,BC-1/2/3,BC-4/5/6 >>>>>>>>>> respectively; >>>>>>>>>>>> >>>>>>>>>>>> With createMessageStreamsByFilter("*C" => 3) a total of 3 >> threads >>>>>>>>> will >>>>>>>>>> be >>>>>>>>>>>> created, and consuming AC-1/BC-1/BC-2, AC-2/BC-3/BC-4, >>>>> AC-3/BC-5/BC-6 >>>>>>>>>>>> respectively. >>>>>>>>>>>> >>>>>>>>>>>> Guozhang >>>>>>>>>>>> >>>>>>>>>>>> On Tue, Feb 10, 2015 at 8:37 AM, tao xiao <xiaotao...@gmail.com >>> >>>>>>>>>> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Guozhang, >>>>>>>>>>>>> >>>>>>>>>>>>> Do you mean that each regex matched topic owns number of >> threads >>>>>>>>> that >>>>>>>>>>> get >>>>>>>>>>>>> passed in to createMessageStreamsByFilter ? For example in >> below >>>>>>>>> code >>>>>>>>>>> If >>>>>>>>>>>> I >>>>>>>>>>>>> have 3 matched topics each of which has 2 partitions then I >> should >>>>>>>>>> have >>>>>>>>>>>> 3 * >>>>>>>>>>>>> 2 = 6 threads in total with each topic owning 2 threads. >>>>>>>>>>>>> >>>>>>>>>>>>> TopicFilter filter = new Whitelist(".*"); >>>>>>>>>>>>> >>>>>>>>>>>>> int threadTotal = 2; >>>>>>>>>>>>> >>>>>>>>>>>>> List<KafkaStream<byte[], byte[]>> streams = connector >>>>>>>>>>>>> .createMessageStreamsByFilter(filter, threadTotal); >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> But what I observed from the log is different >>>>>>>>>>>>> >>>>>>>>>>>>> 2015-02-11 00:24:04 INFO kafka.utils.Logging$class:68 - >> Consumer >>>>>>>>>>>>> test1234dd5_localhost-1423585444070-82f23758 rebalancing the >>>>>>>>>> following >>>>>>>>>>>>> partitions: ArrayBuffer(0, 1) for topic kafkatopic-5 with >>>>>>>>> consumers: >>>>>>>>>>>>> List(test1234dd5_localhost-1423585444070-82f23758-0, >>>>>>>>>>>>> test1234dd5_localhost-1423585444070-82f23758-1) >>>>>>>>>>>>> >>>>>>>>>>>>> 2015-02-11 00:24:04 INFO kafka.utils.Logging$class:68 - >>>>>>>>>>>>> test1234dd5_localhost-1423585444070-82f23758-1 attempting to >> claim >>>>>>>>>>>>> partition 1 >>>>>>>>>>>>> >>>>>>>>>>>>> 2015-02-11 00:24:04 INFO kafka.utils.Logging$class:68 - >>>>>>>>>>>>> test1234dd5_localhost-1423585444070-82f23758-0 attempting to >> claim >>>>>>>>>>>>> partition 0 >>>>>>>>>>>>> >>>>>>>>>>>>> 2015-02-11 00:24:04 INFO kafka.utils.Logging$class:68 - >> Consumer >>>>>>>>>>>>> test1234dd5_localhost-1423585444070-82f23758 rebalancing the >>>>>>>>>> following >>>>>>>>>>>>> partitions: ArrayBuffer(0) for topic kafkatopic-1 with >> consumers: >>>>>>>>>>>>> List(test1234dd5_localhost-1423585444070-82f23758-0, >>>>>>>>>>>>> test1234dd5_localhost-1423585444070-82f23758-1) >>>>>>>>>>>>> >>>>>>>>>>>>> 2015-02-11 00:24:04 WARN kafka.utils.Logging$class:83 - No >> broker >>>>>>>>>>>>> partitions consumed by consumer thread >>>>>>>>>>>>> test1234dd5_localhost-1423585444070-82f23758-1 for topic >>>>>>>>> kafkatopic-1 >>>>>>>>>>>>> >>>>>>>>>>>>> 2015-02-11 00:24:04 INFO kafka.utils.Logging$class:68 - >>>>>>>>>>>>> test1234dd5_localhost-1423585444070-82f23758-0 attempting to >> claim >>>>>>>>>>>>> partition 0 >>>>>>>>>>>>> >>>>>>>>>>>>> 2015-02-11 00:24:04 INFO kafka.utils.Logging$class:68 - >> Consumer >>>>>>>>>>>>> test1234dd5_localhost-1423585444070-82f23758 rebalancing the >>>>>>>>>> following >>>>>>>>>>>>> partitions: ArrayBuffer(0, 1) for topic kafkatopic-4 with >>>>>>>>> consumers: >>>>>>>>>>>>> List(test1234dd5_localhost-1423585444070-82f23758-0, >>>>>>>>>>>>> test1234dd5_localhost-1423585444070-82f23758-1) >>>>>>>>>>>>> >>>>>>>>>>>>> 2015-02-11 00:24:04 INFO kafka.utils.Logging$class:68 - >>>>>>>>>>>>> test1234dd5_localhost-1423585444070-82f23758-1 attempting to >> claim >>>>>>>>>>>>> partition 1 >>>>>>>>>>>>> >>>>>>>>>>>>> 2015-02-11 00:24:04 INFO kafka.utils.Logging$class:68 - >>>>>>>>>>>>> test1234dd5_localhost-1423585444070-82f23758-0 attempting to >> claim >>>>>>>>>>>>> partition 0 >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> As you can see from the log there are only 2 threads created >> and >>>>>>>>>> shared >>>>>>>>>>>>> among 3 topics. With this setting I think the parallelism is >>>>>>>>> degraded >>>>>>>>>>>> and a >>>>>>>>>>>>> slow topic may impact other topics' consumption performance. >> Any >>>>>>>>>>>> thoughts? >>>>>>>>>>>>> >>>>>>>>>>>>> On Wed, Feb 11, 2015 at 12:00 AM, Guozhang Wang < >>>>>>>>> wangg...@gmail.com> >>>>>>>>>>>>> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> createMessageStreams is used for consuming from specific >>>>>>>>> topic(s), >>>>>>>>>>>> where >>>>>>>>>>>>>> you can put a map of [topic-name, num-threads] as its input >>>>>>>>>>> parameters; >>>>>>>>>>>>>> >>>>>>>>>>>>>> createMessageStreamsByFilter is used for consuming from >> wildcard >>>>>>>>>>>> topics, >>>>>>>>>>>>>> where you can put a (regex, num-threads) as its input >> parameters, >>>>>>>>>> and >>>>>>>>>>>> for >>>>>>>>>>>>>> each regex matched topic num-threads will be created. >>>>>>>>>>>>>> >>>>>>>>>>>>>> The difference between these two are not really for >> throughput / >>>>>>>>>>>> latency, >>>>>>>>>>>>>> but rather consumption semantics. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Guozhang >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Tue, Feb 10, 2015 at 3:02 AM, tao xiao < >> xiaotao...@gmail.com> >>>>>>>>>>>> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> Hi team, >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> I am comparing the differences between >>>>>>>>>>>>>>> ConsumerConnector.createMessageStreams >>>>>>>>>>>>>>> and ConsumerConnector.createMessageStreamsByFilter. My >>>>>>>>>>> understanding >>>>>>>>>>>> is >>>>>>>>>>>>>>> that createMessageStreams creates x number of threads (x is >> the >>>>>>>>>>>> number >>>>>>>>>>>>> of >>>>>>>>>>>>>>> threads passed in to the method) dedicated to the specified >>>>>>>>> topic >>>>>>>>>>>>>>> while createMessageStreamsByFilter creates x number of >> threads >>>>>>>>>>> shared >>>>>>>>>>>>> by >>>>>>>>>>>>>>> topics specified by TopicFilter. Is it correct? >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> If this is the case I assume createMessageStreams is the >>>>>>>>>> preferred >>>>>>>>>>>> way >>>>>>>>>>>>> to >>>>>>>>>>>>>>> create streams for each topic if I have high throughput and >> low >>>>>>>>>>>> latency >>>>>>>>>>>>>>> demands. is my assumption correct? >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> -- >>>>>>>>>>>>>>> Regards, >>>>>>>>>>>>>>> Tao >>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> -- >>>>>>>>>>>>>> -- Guozhang >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> -- >>>>>>>>>>>>> Regards, >>>>>>>>>>>>> Tao >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> -- >>>>>>>>>>>> -- Guozhang >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> -- >>>>>>>>>>> Regards, >>>>>>>>>>> Tao >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> -- >>>>>>>>>> -- Guozhang >>>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> -- >>>>>>>>> Regards, >>>>>>>>> Tao >>>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> -- >>>>>>>> -- Guozhang >>>>>>> >>>>>>> >>>>>> >>>>>> >>>>>> -- >>>>>> -- Guozhang >>>>> >>>>> >>>> >>>> >>>> -- >>>> Regards, >>>> Tao >>> >>> ____________________________________________________________ >>> What's your flood risk? >>> Find flood maps, interactive tools, FAQs, and agents in your area. >>> http://thirdpartyoffers.netzero.net/TGL3255/5501df28707b05f280272mp12duc >> <http://thirdpartyoffers.netzero.net/TGL3255/5501df28707b05f280272mp12duc> >> > > > > -- > Regards, > Tao > ____________________________________________________________ > Want to place your ad here? > Advertise on United Online > http://thirdpartyoffers.netzero.net/TGL3255/55028688c3a996884bccmp02duc