Sorry, but still confused.  Maximum number of threads (fetchers) to fetch from 
a Leader or maximum number of threads within a follower broker?

Thanks for clarifying,
-Zakee



> On Mar 12, 2015, at 11:11 PM, tao xiao <xiaotao...@gmail.com> wrote:
> 
> The number of fetchers is configurable via num.replica.fetchers. The
> description of num.replica.fetchers in Kafka documentation is not quite
> accurate. num.replica.fetchers actually controls the max number of fetchers
> per broker. In you case num.replica.fetchers=8 and 5 brokers the means no
> more 8 fetchers created for each broker
> 
> On Fri, Mar 13, 2015 at 1:21 PM, Zakee <kzak...@netzero.net> wrote:
> 
>> Is this always the case that there is only one fetcher per broker, won’t
>> setting num.replica.fetchers greater than number-of-brokers cause more
>> fetchers per broker?
>> Let’s I have 5 brokers, and num of replica fetchers is 8, will there be 2
>> fetcher threads pulling from  each broker?
>> 
>> Thanks
>> Zakee
>> 
>> 
>> 
>>> On Mar 12, 2015, at 11:15 AM, James Cheng <jch...@tivo.com> wrote:
>>> 
>>> Ah, I understand now. I didn't realize that there was one fetcher thread
>> per broker.
>>> 
>>> Thanks Tao & Guozhang!
>>> -James
>>> 
>>> 
>>> On Mar 11, 2015, at 5:00 PM, tao xiao <xiaotao...@gmail.com <mailto:
>> xiaotao...@gmail.com>> wrote:
>>> 
>>>> Fetcher thread is per broker basis, it ensures that at lease one fetcher
>>>> thread per broker. Fetcher thread is sent to broker with a fetch
>> request to
>>>> ask for all partitions. So if A, B, C are in the same broker fetcher
>> thread
>>>> is still able to fetch data from A, B, C even though A returns no data.
>>>> same logic is applied to different broker.
>>>> 
>>>> On Thu, Mar 12, 2015 at 6:25 AM, James Cheng <jch...@tivo.com> wrote:
>>>> 
>>>>> 
>>>>> On Mar 11, 2015, at 9:12 AM, Guozhang Wang <wangg...@gmail.com> wrote:
>>>>> 
>>>>>> Hi James,
>>>>>> 
>>>>>> What I meant before is that a single fetcher may be responsible for
>>>>> putting
>>>>>> fetched data to multiple queues according to the construction of the
>>>>>> streams setup, where each queue may be consumed by a different thread.
>>>>> And
>>>>>> the queues are actually bounded. Now say if there are two queues that
>> are
>>>>>> getting data from the same fetcher F, and are consumed by two
>> different
>>>>>> user threads A and B. If thread A for some reason got slowed / hung
>>>>>> consuming data from queue 1, then queue 1 will eventually get full,
>> and F
>>>>>> trying to put more data to it will be blocked. Since F is parked on
>>>>> trying
>>>>>> to put data to queue 1, queue 2 will not get more data from it, and
>>>>> thread
>>>>>> B may hence gets starved. Does that make sense now?
>>>>>> 
>>>>> 
>>>>> Yes, that makes sense. That is the scenario where one thread of a
>> consumer
>>>>> can cause a backup in the queue, which would cause other threads to not
>>>>> receive data.
>>>>> 
>>>>> What about the situation I described, where a thread consumes a queue
>> that
>>>>> is supposed to be filled with messages from multiple partitions? If
>>>>> partition A has no messages and partitions B and C do, how will the
>> fetcher
>>>>> behave? Will the processing thread receive messages from partitions B
>> and C?
>>>>> 
>>>>> Thanks,
>>>>> -James
>>>>> 
>>>>> 
>>>>>> Guozhang
>>>>>> 
>>>>>> On Tue, Mar 10, 2015 at 5:15 PM, James Cheng <jch...@tivo.com> wrote:
>>>>>> 
>>>>>>> Hi,
>>>>>>> 
>>>>>>> Sorry to bring up this old thread, but my question is about this
>> exact
>>>>>>> thing:
>>>>>>> 
>>>>>>> Guozhang, you said:
>>>>>>>> A more concrete example: say you have topic AC: 3 partitions, topic
>>>>> BC: 6
>>>>>>>> partitions.
>>>>>>>> 
>>>>>>>> With createMessageStreams("AC" => 3, "BC" => 2) a total of 5 threads
>>>>> will
>>>>>>>> be created, and consuming AC-1,AC-2,AC-3,BC-1/2/3,BC-4/5/6
>>>>> respectively;
>>>>>>>> 
>>>>>>>> With createMessageStreamsByFilter("*C" => 3) a total of 3 threads
>> will
>>>>> be
>>>>>>>> created, and consuming AC-1/BC-1/BC-2, AC-2/BC-3/BC-4,
>> AC-3/BC-5/BC-6
>>>>>>>> respectively.
>>>>>>> 
>>>>>>> 
>>>>>>> You said that in the createMessageStreamsByFilter case, if topic AC
>> had
>>>>> no
>>>>>>> messages in it and consumer.timeout.ms = -1, then the 3 threads
>> might
>>>>> all
>>>>>>> be blocked waiting for data to arrive from topic AC, and so messages
>>>>> from
>>>>>>> BC would not be processed.
>>>>>>> 
>>>>>>> createMessageStreamsByFilter("*C" => 1) (single stream) would have
>> the
>>>>>>> same problem but just worse. Behind the scenes, is there a single
>> thread
>>>>>>> that is consuming (round-robin?) messages from the different
>> partitions
>>>>> and
>>>>>>> inserting them all into a single queue for the application code to
>>>>> process?
>>>>>>> And that is why a single partition with no messages with block the
>> other
>>>>>>> messages from getting through?
>>>>>>> 
>>>>>>> What about createMessageStreams("AC" => 1)? That creates a single
>> stream
>>>>>>> that contains messages from multiple partitions, which might be on
>>>>>>> different brokers. Does that also suffer the same problem, where if
>> one
>>>>>>> partition has no messages, that the application would not receive
>>>>> messages
>>>>>>> from the other paritions?
>>>>>>> 
>>>>>>> Thanks,
>>>>>>> -James
>>>>>>> 
>>>>>>> 
>>>>>>> On Feb 11, 2015, at 8:13 AM, Guozhang Wang <wangg...@gmail.com>
>> wrote:
>>>>>>> 
>>>>>>>> The new consumer will be released in 0.9, which is targeted for end
>> of
>>>>>>> this
>>>>>>>> quarter.
>>>>>>>> 
>>>>>>>> On Tue, Feb 10, 2015 at 7:11 PM, tao xiao <xiaotao...@gmail.com>
>>>>> wrote:
>>>>>>>> 
>>>>>>>>> Do you know when the new consumer API will be publicly available?
>>>>>>>>> 
>>>>>>>>> On Wed, Feb 11, 2015 at 10:43 AM, Guozhang Wang <
>> wangg...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>> 
>>>>>>>>>> Yes, it can get stuck. For example, AC and BC are processed by two
>>>>>>>>>> different processes and AC processors gets stuck, hence AC
>> messages
>>>>>>> will
>>>>>>>>>> fill up in the consumer's buffer and eventually prevents the
>> fetcher
>>>>>>>>> thread
>>>>>>>>>> to put more data into it; the fetcher thread will be blocked on
>> that
>>>>>>> and
>>>>>>>>>> not be able to fetch BC.
>>>>>>>>>> 
>>>>>>>>>> This issue has been addressed in the new consumer client, which is
>>>>>>>>>> single-threaded with non-blocking APIs.
>>>>>>>>>> 
>>>>>>>>>> Guozhang
>>>>>>>>>> 
>>>>>>>>>> On Tue, Feb 10, 2015 at 6:24 PM, tao xiao <xiaotao...@gmail.com>
>>>>>>> wrote:
>>>>>>>>>> 
>>>>>>>>>>> Thank you Guozhang for your detailed explanation. In your example
>>>>>>>>>>> createMessageStreamsByFilter("*C" => 3)  since threads are shared
>>>>>>> among
>>>>>>>>>>> topics there may be situation where all 3 threads threads get
>> stuck
>>>>>>>>> with
>>>>>>>>>>> topic AC e.g. topic is empty which will be holding the connecting
>>>>>>>>> threads
>>>>>>>>>>> (setting consumer.timeout.ms=-1) hence there is no thread to
>> serve
>>>>>>>>> topic
>>>>>>>>>>> BC. do you think this situation will happen?
>>>>>>>>>>> 
>>>>>>>>>>> On Wed, Feb 11, 2015 at 2:15 AM, Guozhang Wang <
>> wangg...@gmail.com>
>>>>>>>>>> wrote:
>>>>>>>>>>> 
>>>>>>>>>>>> I was not clear before .. for createMessageStreamsByFilter each
>>>>>>>>> matched
>>>>>>>>>>>> topic will have num-threads, but shared: i.e. there will be
>> totally
>>>>>>>>>>>> num-threads created, but each thread will be responsible for
>>>>> fetching
>>>>>>>>>> all
>>>>>>>>>>>> matched topics.
>>>>>>>>>>>> 
>>>>>>>>>>>> A more concrete example: say you have topic AC: 3 partitions,
>> topic
>>>>>>>>>> BC: 6
>>>>>>>>>>>> partitions.
>>>>>>>>>>>> 
>>>>>>>>>>>> With createMessageStreams("AC" => 3, "BC" => 2) a total of 5
>>>>> threads
>>>>>>>>>> will
>>>>>>>>>>>> be created, and consuming AC-1,AC-2,AC-3,BC-1/2/3,BC-4/5/6
>>>>>>>>>> respectively;
>>>>>>>>>>>> 
>>>>>>>>>>>> With createMessageStreamsByFilter("*C" => 3) a total of 3
>> threads
>>>>>>>>> will
>>>>>>>>>> be
>>>>>>>>>>>> created, and consuming AC-1/BC-1/BC-2, AC-2/BC-3/BC-4,
>>>>> AC-3/BC-5/BC-6
>>>>>>>>>>>> respectively.
>>>>>>>>>>>> 
>>>>>>>>>>>> Guozhang
>>>>>>>>>>>> 
>>>>>>>>>>>> On Tue, Feb 10, 2015 at 8:37 AM, tao xiao <xiaotao...@gmail.com
>>> 
>>>>>>>>>> wrote:
>>>>>>>>>>>> 
>>>>>>>>>>>>> Guozhang,
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Do you mean that each regex matched topic owns number of
>> threads
>>>>>>>>> that
>>>>>>>>>>> get
>>>>>>>>>>>>> passed in to createMessageStreamsByFilter ? For example in
>> below
>>>>>>>>> code
>>>>>>>>>>> If
>>>>>>>>>>>> I
>>>>>>>>>>>>> have 3 matched topics each of which has 2 partitions then I
>> should
>>>>>>>>>> have
>>>>>>>>>>>> 3 *
>>>>>>>>>>>>> 2 = 6 threads in total with each topic owning 2 threads.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> TopicFilter filter = new Whitelist(".*");
>>>>>>>>>>>>> 
>>>>>>>>>>>>> int threadTotal = 2;
>>>>>>>>>>>>> 
>>>>>>>>>>>>> List<KafkaStream<byte[], byte[]>> streams = connector
>>>>>>>>>>>>> .createMessageStreamsByFilter(filter, threadTotal);
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> But what I observed from the log is different
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 -
>> Consumer
>>>>>>>>>>>>> test1234dd5_localhost-1423585444070-82f23758 rebalancing the
>>>>>>>>>> following
>>>>>>>>>>>>> partitions: ArrayBuffer(0, 1) for topic kafkatopic-5 with
>>>>>>>>> consumers:
>>>>>>>>>>>>> List(test1234dd5_localhost-1423585444070-82f23758-0,
>>>>>>>>>>>>> test1234dd5_localhost-1423585444070-82f23758-1)
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 -
>>>>>>>>>>>>> test1234dd5_localhost-1423585444070-82f23758-1 attempting to
>> claim
>>>>>>>>>>>>> partition 1
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 -
>>>>>>>>>>>>> test1234dd5_localhost-1423585444070-82f23758-0 attempting to
>> claim
>>>>>>>>>>>>> partition 0
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 -
>> Consumer
>>>>>>>>>>>>> test1234dd5_localhost-1423585444070-82f23758 rebalancing the
>>>>>>>>>> following
>>>>>>>>>>>>> partitions: ArrayBuffer(0) for topic kafkatopic-1 with
>> consumers:
>>>>>>>>>>>>> List(test1234dd5_localhost-1423585444070-82f23758-0,
>>>>>>>>>>>>> test1234dd5_localhost-1423585444070-82f23758-1)
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 2015-02-11 00:24:04 WARN  kafka.utils.Logging$class:83 - No
>> broker
>>>>>>>>>>>>> partitions consumed by consumer thread
>>>>>>>>>>>>> test1234dd5_localhost-1423585444070-82f23758-1 for topic
>>>>>>>>> kafkatopic-1
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 -
>>>>>>>>>>>>> test1234dd5_localhost-1423585444070-82f23758-0 attempting to
>> claim
>>>>>>>>>>>>> partition 0
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 -
>> Consumer
>>>>>>>>>>>>> test1234dd5_localhost-1423585444070-82f23758 rebalancing the
>>>>>>>>>> following
>>>>>>>>>>>>> partitions: ArrayBuffer(0, 1) for topic kafkatopic-4 with
>>>>>>>>> consumers:
>>>>>>>>>>>>> List(test1234dd5_localhost-1423585444070-82f23758-0,
>>>>>>>>>>>>> test1234dd5_localhost-1423585444070-82f23758-1)
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 -
>>>>>>>>>>>>> test1234dd5_localhost-1423585444070-82f23758-1 attempting to
>> claim
>>>>>>>>>>>>> partition 1
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 -
>>>>>>>>>>>>> test1234dd5_localhost-1423585444070-82f23758-0 attempting to
>> claim
>>>>>>>>>>>>> partition 0
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> As you can see from the log there are only 2 threads created
>> and
>>>>>>>>>> shared
>>>>>>>>>>>>> among 3 topics. With this setting I think the parallelism is
>>>>>>>>> degraded
>>>>>>>>>>>> and a
>>>>>>>>>>>>> slow topic may impact other topics' consumption performance.
>> Any
>>>>>>>>>>>> thoughts?
>>>>>>>>>>>>> 
>>>>>>>>>>>>> On Wed, Feb 11, 2015 at 12:00 AM, Guozhang Wang <
>>>>>>>>> wangg...@gmail.com>
>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>> 
>>>>>>>>>>>>>> createMessageStreams is used for consuming from specific
>>>>>>>>> topic(s),
>>>>>>>>>>>> where
>>>>>>>>>>>>>> you can put a map of [topic-name, num-threads] as its input
>>>>>>>>>>> parameters;
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> createMessageStreamsByFilter is used for consuming from
>> wildcard
>>>>>>>>>>>> topics,
>>>>>>>>>>>>>> where you can put a (regex, num-threads) as its input
>> parameters,
>>>>>>>>>> and
>>>>>>>>>>>> for
>>>>>>>>>>>>>> each regex matched topic num-threads will be created.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> The difference between these two are not really for
>> throughput /
>>>>>>>>>>>> latency,
>>>>>>>>>>>>>> but rather consumption semantics.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Guozhang
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> On Tue, Feb 10, 2015 at 3:02 AM, tao xiao <
>> xiaotao...@gmail.com>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> Hi team,
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> I am comparing the differences between
>>>>>>>>>>>>>>> ConsumerConnector.createMessageStreams
>>>>>>>>>>>>>>> and ConsumerConnector.createMessageStreamsByFilter. My
>>>>>>>>>>> understanding
>>>>>>>>>>>> is
>>>>>>>>>>>>>>> that createMessageStreams creates x number of threads (x is
>> the
>>>>>>>>>>>> number
>>>>>>>>>>>>> of
>>>>>>>>>>>>>>> threads passed in to the method) dedicated to the specified
>>>>>>>>> topic
>>>>>>>>>>>>>>> while createMessageStreamsByFilter creates x number of
>> threads
>>>>>>>>>>> shared
>>>>>>>>>>>>> by
>>>>>>>>>>>>>>> topics specified by TopicFilter. Is it correct?
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> If this is the case I assume createMessageStreams is the
>>>>>>>>>> preferred
>>>>>>>>>>>> way
>>>>>>>>>>>>> to
>>>>>>>>>>>>>>> create streams for each topic if I have high throughput and
>> low
>>>>>>>>>>>> latency
>>>>>>>>>>>>>>> demands. is my assumption correct?
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>> Regards,
>>>>>>>>>>>>>>> Tao
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> --
>>>>>>>>>>>>>> -- Guozhang
>>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> --
>>>>>>>>>>>>> Regards,
>>>>>>>>>>>>> Tao
>>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>>>> --
>>>>>>>>>>>> -- Guozhang
>>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> --
>>>>>>>>>>> Regards,
>>>>>>>>>>> Tao
>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> --
>>>>>>>>>> -- Guozhang
>>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> --
>>>>>>>>> Regards,
>>>>>>>>> Tao
>>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>> --
>>>>>>>> -- Guozhang
>>>>>>> 
>>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> --
>>>>>> -- Guozhang
>>>>> 
>>>>> 
>>>> 
>>>> 
>>>> --
>>>> Regards,
>>>> Tao
>>> 
>>> ____________________________________________________________
>>> What's your flood risk?
>>> Find flood maps, interactive tools, FAQs, and agents in your area.
>>> http://thirdpartyoffers.netzero.net/TGL3255/5501df28707b05f280272mp12duc
>> <http://thirdpartyoffers.netzero.net/TGL3255/5501df28707b05f280272mp12duc>
>> 
> 
> 
> 
> -- 
> Regards,
> Tao
> ____________________________________________________________
> Want to place your ad here?
> Advertise on United Online
> http://thirdpartyoffers.netzero.net/TGL3255/55028688c3a996884bccmp02duc

Reply via email to