The number of fetchers is configurable via num.replica.fetchers. The
description of num.replica.fetchers in Kafka documentation is not quite
accurate. num.replica.fetchers actually controls the max number of fetchers
per broker. In you case num.replica.fetchers=8 and 5 brokers the means no
more 8 fetchers created for each broker

On Fri, Mar 13, 2015 at 1:21 PM, Zakee <kzak...@netzero.net> wrote:

> Is this always the case that there is only one fetcher per broker, won’t
> setting num.replica.fetchers greater than number-of-brokers cause more
> fetchers per broker?
> Let’s I have 5 brokers, and num of replica fetchers is 8, will there be 2
> fetcher threads pulling from  each broker?
>
> Thanks
> Zakee
>
>
>
> > On Mar 12, 2015, at 11:15 AM, James Cheng <jch...@tivo.com> wrote:
> >
> > Ah, I understand now. I didn't realize that there was one fetcher thread
> per broker.
> >
> > Thanks Tao & Guozhang!
> > -James
> >
> >
> > On Mar 11, 2015, at 5:00 PM, tao xiao <xiaotao...@gmail.com <mailto:
> xiaotao...@gmail.com>> wrote:
> >
> >> Fetcher thread is per broker basis, it ensures that at lease one fetcher
> >> thread per broker. Fetcher thread is sent to broker with a fetch
> request to
> >> ask for all partitions. So if A, B, C are in the same broker fetcher
> thread
> >> is still able to fetch data from A, B, C even though A returns no data.
> >> same logic is applied to different broker.
> >>
> >> On Thu, Mar 12, 2015 at 6:25 AM, James Cheng <jch...@tivo.com> wrote:
> >>
> >>>
> >>> On Mar 11, 2015, at 9:12 AM, Guozhang Wang <wangg...@gmail.com> wrote:
> >>>
> >>>> Hi James,
> >>>>
> >>>> What I meant before is that a single fetcher may be responsible for
> >>> putting
> >>>> fetched data to multiple queues according to the construction of the
> >>>> streams setup, where each queue may be consumed by a different thread.
> >>> And
> >>>> the queues are actually bounded. Now say if there are two queues that
> are
> >>>> getting data from the same fetcher F, and are consumed by two
> different
> >>>> user threads A and B. If thread A for some reason got slowed / hung
> >>>> consuming data from queue 1, then queue 1 will eventually get full,
> and F
> >>>> trying to put more data to it will be blocked. Since F is parked on
> >>> trying
> >>>> to put data to queue 1, queue 2 will not get more data from it, and
> >>> thread
> >>>> B may hence gets starved. Does that make sense now?
> >>>>
> >>>
> >>> Yes, that makes sense. That is the scenario where one thread of a
> consumer
> >>> can cause a backup in the queue, which would cause other threads to not
> >>> receive data.
> >>>
> >>> What about the situation I described, where a thread consumes a queue
> that
> >>> is supposed to be filled with messages from multiple partitions? If
> >>> partition A has no messages and partitions B and C do, how will the
> fetcher
> >>> behave? Will the processing thread receive messages from partitions B
> and C?
> >>>
> >>> Thanks,
> >>> -James
> >>>
> >>>
> >>>> Guozhang
> >>>>
> >>>> On Tue, Mar 10, 2015 at 5:15 PM, James Cheng <jch...@tivo.com> wrote:
> >>>>
> >>>>> Hi,
> >>>>>
> >>>>> Sorry to bring up this old thread, but my question is about this
> exact
> >>>>> thing:
> >>>>>
> >>>>> Guozhang, you said:
> >>>>>> A more concrete example: say you have topic AC: 3 partitions, topic
> >>> BC: 6
> >>>>>> partitions.
> >>>>>>
> >>>>>> With createMessageStreams("AC" => 3, "BC" => 2) a total of 5 threads
> >>> will
> >>>>>> be created, and consuming AC-1,AC-2,AC-3,BC-1/2/3,BC-4/5/6
> >>> respectively;
> >>>>>>
> >>>>>> With createMessageStreamsByFilter("*C" => 3) a total of 3 threads
> will
> >>> be
> >>>>>> created, and consuming AC-1/BC-1/BC-2, AC-2/BC-3/BC-4,
> AC-3/BC-5/BC-6
> >>>>>> respectively.
> >>>>>
> >>>>>
> >>>>> You said that in the createMessageStreamsByFilter case, if topic AC
> had
> >>> no
> >>>>> messages in it and consumer.timeout.ms = -1, then the 3 threads
> might
> >>> all
> >>>>> be blocked waiting for data to arrive from topic AC, and so messages
> >>> from
> >>>>> BC would not be processed.
> >>>>>
> >>>>> createMessageStreamsByFilter("*C" => 1) (single stream) would have
> the
> >>>>> same problem but just worse. Behind the scenes, is there a single
> thread
> >>>>> that is consuming (round-robin?) messages from the different
> partitions
> >>> and
> >>>>> inserting them all into a single queue for the application code to
> >>> process?
> >>>>> And that is why a single partition with no messages with block the
> other
> >>>>> messages from getting through?
> >>>>>
> >>>>> What about createMessageStreams("AC" => 1)? That creates a single
> stream
> >>>>> that contains messages from multiple partitions, which might be on
> >>>>> different brokers. Does that also suffer the same problem, where if
> one
> >>>>> partition has no messages, that the application would not receive
> >>> messages
> >>>>> from the other paritions?
> >>>>>
> >>>>> Thanks,
> >>>>> -James
> >>>>>
> >>>>>
> >>>>> On Feb 11, 2015, at 8:13 AM, Guozhang Wang <wangg...@gmail.com>
> wrote:
> >>>>>
> >>>>>> The new consumer will be released in 0.9, which is targeted for end
> of
> >>>>> this
> >>>>>> quarter.
> >>>>>>
> >>>>>> On Tue, Feb 10, 2015 at 7:11 PM, tao xiao <xiaotao...@gmail.com>
> >>> wrote:
> >>>>>>
> >>>>>>> Do you know when the new consumer API will be publicly available?
> >>>>>>>
> >>>>>>> On Wed, Feb 11, 2015 at 10:43 AM, Guozhang Wang <
> wangg...@gmail.com>
> >>>>>>> wrote:
> >>>>>>>
> >>>>>>>> Yes, it can get stuck. For example, AC and BC are processed by two
> >>>>>>>> different processes and AC processors gets stuck, hence AC
> messages
> >>>>> will
> >>>>>>>> fill up in the consumer's buffer and eventually prevents the
> fetcher
> >>>>>>> thread
> >>>>>>>> to put more data into it; the fetcher thread will be blocked on
> that
> >>>>> and
> >>>>>>>> not be able to fetch BC.
> >>>>>>>>
> >>>>>>>> This issue has been addressed in the new consumer client, which is
> >>>>>>>> single-threaded with non-blocking APIs.
> >>>>>>>>
> >>>>>>>> Guozhang
> >>>>>>>>
> >>>>>>>> On Tue, Feb 10, 2015 at 6:24 PM, tao xiao <xiaotao...@gmail.com>
> >>>>> wrote:
> >>>>>>>>
> >>>>>>>>> Thank you Guozhang for your detailed explanation. In your example
> >>>>>>>>> createMessageStreamsByFilter("*C" => 3)  since threads are shared
> >>>>> among
> >>>>>>>>> topics there may be situation where all 3 threads threads get
> stuck
> >>>>>>> with
> >>>>>>>>> topic AC e.g. topic is empty which will be holding the connecting
> >>>>>>> threads
> >>>>>>>>> (setting consumer.timeout.ms=-1) hence there is no thread to
> serve
> >>>>>>> topic
> >>>>>>>>> BC. do you think this situation will happen?
> >>>>>>>>>
> >>>>>>>>> On Wed, Feb 11, 2015 at 2:15 AM, Guozhang Wang <
> wangg...@gmail.com>
> >>>>>>>> wrote:
> >>>>>>>>>
> >>>>>>>>>> I was not clear before .. for createMessageStreamsByFilter each
> >>>>>>> matched
> >>>>>>>>>> topic will have num-threads, but shared: i.e. there will be
> totally
> >>>>>>>>>> num-threads created, but each thread will be responsible for
> >>> fetching
> >>>>>>>> all
> >>>>>>>>>> matched topics.
> >>>>>>>>>>
> >>>>>>>>>> A more concrete example: say you have topic AC: 3 partitions,
> topic
> >>>>>>>> BC: 6
> >>>>>>>>>> partitions.
> >>>>>>>>>>
> >>>>>>>>>> With createMessageStreams("AC" => 3, "BC" => 2) a total of 5
> >>> threads
> >>>>>>>> will
> >>>>>>>>>> be created, and consuming AC-1,AC-2,AC-3,BC-1/2/3,BC-4/5/6
> >>>>>>>> respectively;
> >>>>>>>>>>
> >>>>>>>>>> With createMessageStreamsByFilter("*C" => 3) a total of 3
> threads
> >>>>>>> will
> >>>>>>>> be
> >>>>>>>>>> created, and consuming AC-1/BC-1/BC-2, AC-2/BC-3/BC-4,
> >>> AC-3/BC-5/BC-6
> >>>>>>>>>> respectively.
> >>>>>>>>>>
> >>>>>>>>>> Guozhang
> >>>>>>>>>>
> >>>>>>>>>> On Tue, Feb 10, 2015 at 8:37 AM, tao xiao <xiaotao...@gmail.com
> >
> >>>>>>>> wrote:
> >>>>>>>>>>
> >>>>>>>>>>> Guozhang,
> >>>>>>>>>>>
> >>>>>>>>>>> Do you mean that each regex matched topic owns number of
> threads
> >>>>>>> that
> >>>>>>>>> get
> >>>>>>>>>>> passed in to createMessageStreamsByFilter ? For example in
> below
> >>>>>>> code
> >>>>>>>>> If
> >>>>>>>>>> I
> >>>>>>>>>>> have 3 matched topics each of which has 2 partitions then I
> should
> >>>>>>>> have
> >>>>>>>>>> 3 *
> >>>>>>>>>>> 2 = 6 threads in total with each topic owning 2 threads.
> >>>>>>>>>>>
> >>>>>>>>>>> TopicFilter filter = new Whitelist(".*");
> >>>>>>>>>>>
> >>>>>>>>>>> int threadTotal = 2;
> >>>>>>>>>>>
> >>>>>>>>>>> List<KafkaStream<byte[], byte[]>> streams = connector
> >>>>>>>>>>> .createMessageStreamsByFilter(filter, threadTotal);
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> But what I observed from the log is different
> >>>>>>>>>>>
> >>>>>>>>>>> 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 -
> Consumer
> >>>>>>>>>>> test1234dd5_localhost-1423585444070-82f23758 rebalancing the
> >>>>>>>> following
> >>>>>>>>>>> partitions: ArrayBuffer(0, 1) for topic kafkatopic-5 with
> >>>>>>> consumers:
> >>>>>>>>>>> List(test1234dd5_localhost-1423585444070-82f23758-0,
> >>>>>>>>>>> test1234dd5_localhost-1423585444070-82f23758-1)
> >>>>>>>>>>>
> >>>>>>>>>>> 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 -
> >>>>>>>>>>> test1234dd5_localhost-1423585444070-82f23758-1 attempting to
> claim
> >>>>>>>>>>> partition 1
> >>>>>>>>>>>
> >>>>>>>>>>> 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 -
> >>>>>>>>>>> test1234dd5_localhost-1423585444070-82f23758-0 attempting to
> claim
> >>>>>>>>>>> partition 0
> >>>>>>>>>>>
> >>>>>>>>>>> 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 -
> Consumer
> >>>>>>>>>>> test1234dd5_localhost-1423585444070-82f23758 rebalancing the
> >>>>>>>> following
> >>>>>>>>>>> partitions: ArrayBuffer(0) for topic kafkatopic-1 with
> consumers:
> >>>>>>>>>>> List(test1234dd5_localhost-1423585444070-82f23758-0,
> >>>>>>>>>>> test1234dd5_localhost-1423585444070-82f23758-1)
> >>>>>>>>>>>
> >>>>>>>>>>> 2015-02-11 00:24:04 WARN  kafka.utils.Logging$class:83 - No
> broker
> >>>>>>>>>>> partitions consumed by consumer thread
> >>>>>>>>>>> test1234dd5_localhost-1423585444070-82f23758-1 for topic
> >>>>>>> kafkatopic-1
> >>>>>>>>>>>
> >>>>>>>>>>> 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 -
> >>>>>>>>>>> test1234dd5_localhost-1423585444070-82f23758-0 attempting to
> claim
> >>>>>>>>>>> partition 0
> >>>>>>>>>>>
> >>>>>>>>>>> 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 -
> Consumer
> >>>>>>>>>>> test1234dd5_localhost-1423585444070-82f23758 rebalancing the
> >>>>>>>> following
> >>>>>>>>>>> partitions: ArrayBuffer(0, 1) for topic kafkatopic-4 with
> >>>>>>> consumers:
> >>>>>>>>>>> List(test1234dd5_localhost-1423585444070-82f23758-0,
> >>>>>>>>>>> test1234dd5_localhost-1423585444070-82f23758-1)
> >>>>>>>>>>>
> >>>>>>>>>>> 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 -
> >>>>>>>>>>> test1234dd5_localhost-1423585444070-82f23758-1 attempting to
> claim
> >>>>>>>>>>> partition 1
> >>>>>>>>>>>
> >>>>>>>>>>> 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 -
> >>>>>>>>>>> test1234dd5_localhost-1423585444070-82f23758-0 attempting to
> claim
> >>>>>>>>>>> partition 0
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> As you can see from the log there are only 2 threads created
> and
> >>>>>>>> shared
> >>>>>>>>>>> among 3 topics. With this setting I think the parallelism is
> >>>>>>> degraded
> >>>>>>>>>> and a
> >>>>>>>>>>> slow topic may impact other topics' consumption performance.
> Any
> >>>>>>>>>> thoughts?
> >>>>>>>>>>>
> >>>>>>>>>>> On Wed, Feb 11, 2015 at 12:00 AM, Guozhang Wang <
> >>>>>>> wangg...@gmail.com>
> >>>>>>>>>>> wrote:
> >>>>>>>>>>>
> >>>>>>>>>>>> createMessageStreams is used for consuming from specific
> >>>>>>> topic(s),
> >>>>>>>>>> where
> >>>>>>>>>>>> you can put a map of [topic-name, num-threads] as its input
> >>>>>>>>> parameters;
> >>>>>>>>>>>>
> >>>>>>>>>>>> createMessageStreamsByFilter is used for consuming from
> wildcard
> >>>>>>>>>> topics,
> >>>>>>>>>>>> where you can put a (regex, num-threads) as its input
> parameters,
> >>>>>>>> and
> >>>>>>>>>> for
> >>>>>>>>>>>> each regex matched topic num-threads will be created.
> >>>>>>>>>>>>
> >>>>>>>>>>>> The difference between these two are not really for
> throughput /
> >>>>>>>>>> latency,
> >>>>>>>>>>>> but rather consumption semantics.
> >>>>>>>>>>>>
> >>>>>>>>>>>> Guozhang
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>> On Tue, Feb 10, 2015 at 3:02 AM, tao xiao <
> xiaotao...@gmail.com>
> >>>>>>>>>> wrote:
> >>>>>>>>>>>>
> >>>>>>>>>>>>> Hi team,
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> I am comparing the differences between
> >>>>>>>>>>>>> ConsumerConnector.createMessageStreams
> >>>>>>>>>>>>> and ConsumerConnector.createMessageStreamsByFilter. My
> >>>>>>>>> understanding
> >>>>>>>>>> is
> >>>>>>>>>>>>> that createMessageStreams creates x number of threads (x is
> the
> >>>>>>>>>> number
> >>>>>>>>>>> of
> >>>>>>>>>>>>> threads passed in to the method) dedicated to the specified
> >>>>>>> topic
> >>>>>>>>>>>>> while createMessageStreamsByFilter creates x number of
> threads
> >>>>>>>>> shared
> >>>>>>>>>>> by
> >>>>>>>>>>>>> topics specified by TopicFilter. Is it correct?
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> If this is the case I assume createMessageStreams is the
> >>>>>>>> preferred
> >>>>>>>>>> way
> >>>>>>>>>>> to
> >>>>>>>>>>>>> create streams for each topic if I have high throughput and
> low
> >>>>>>>>>> latency
> >>>>>>>>>>>>> demands. is my assumption correct?
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> --
> >>>>>>>>>>>>> Regards,
> >>>>>>>>>>>>> Tao
> >>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>> --
> >>>>>>>>>>>> -- Guozhang
> >>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> --
> >>>>>>>>>>> Regards,
> >>>>>>>>>>> Tao
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> --
> >>>>>>>>>> -- Guozhang
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> --
> >>>>>>>>> Regards,
> >>>>>>>>> Tao
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> --
> >>>>>>>> -- Guozhang
> >>>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>> --
> >>>>>>> Regards,
> >>>>>>> Tao
> >>>>>>>
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>> --
> >>>>>> -- Guozhang
> >>>>>
> >>>>>
> >>>>
> >>>>
> >>>> --
> >>>> -- Guozhang
> >>>
> >>>
> >>
> >>
> >> --
> >> Regards,
> >> Tao
> >
> > ____________________________________________________________
> > What's your flood risk?
> > Find flood maps, interactive tools, FAQs, and agents in your area.
> > http://thirdpartyoffers.netzero.net/TGL3255/5501df28707b05f280272mp12duc
> <http://thirdpartyoffers.netzero.net/TGL3255/5501df28707b05f280272mp12duc>
>



-- 
Regards,
Tao

Reply via email to