Fetch data from a leader to consumer. Replication fetcher is configured by
another property

On Saturday, March 14, 2015, Zakee <kzak...@netzero.net> wrote:

> Sorry, but still confused.  Maximum number of threads (fetchers) to fetch
> from a Leader or maximum number of threads within a follower broker?
>
> Thanks for clarifying,
> -Zakee
>
>
>
> > On Mar 12, 2015, at 11:11 PM, tao xiao <xiaotao...@gmail.com
> <javascript:;>> wrote:
> >
> > The number of fetchers is configurable via num.replica.fetchers. The
> > description of num.replica.fetchers in Kafka documentation is not quite
> > accurate. num.replica.fetchers actually controls the max number of
> fetchers
> > per broker. In you case num.replica.fetchers=8 and 5 brokers the means no
> > more 8 fetchers created for each broker
> >
> > On Fri, Mar 13, 2015 at 1:21 PM, Zakee <kzak...@netzero.net
> <javascript:;>> wrote:
> >
> >> Is this always the case that there is only one fetcher per broker, won’t
> >> setting num.replica.fetchers greater than number-of-brokers cause more
> >> fetchers per broker?
> >> Let’s I have 5 brokers, and num of replica fetchers is 8, will there be
> 2
> >> fetcher threads pulling from  each broker?
> >>
> >> Thanks
> >> Zakee
> >>
> >>
> >>
> >>> On Mar 12, 2015, at 11:15 AM, James Cheng <jch...@tivo.com
> <javascript:;>> wrote:
> >>>
> >>> Ah, I understand now. I didn't realize that there was one fetcher
> thread
> >> per broker.
> >>>
> >>> Thanks Tao & Guozhang!
> >>> -James
> >>>
> >>>
> >>> On Mar 11, 2015, at 5:00 PM, tao xiao <xiaotao...@gmail.com
> <javascript:;> <mailto:
> >> xiaotao...@gmail.com <javascript:;>>> wrote:
> >>>
> >>>> Fetcher thread is per broker basis, it ensures that at lease one
> fetcher
> >>>> thread per broker. Fetcher thread is sent to broker with a fetch
> >> request to
> >>>> ask for all partitions. So if A, B, C are in the same broker fetcher
> >> thread
> >>>> is still able to fetch data from A, B, C even though A returns no
> data.
> >>>> same logic is applied to different broker.
> >>>>
> >>>> On Thu, Mar 12, 2015 at 6:25 AM, James Cheng <jch...@tivo.com
> <javascript:;>> wrote:
> >>>>
> >>>>>
> >>>>> On Mar 11, 2015, at 9:12 AM, Guozhang Wang <wangg...@gmail.com
> <javascript:;>> wrote:
> >>>>>
> >>>>>> Hi James,
> >>>>>>
> >>>>>> What I meant before is that a single fetcher may be responsible for
> >>>>> putting
> >>>>>> fetched data to multiple queues according to the construction of the
> >>>>>> streams setup, where each queue may be consumed by a different
> thread.
> >>>>> And
> >>>>>> the queues are actually bounded. Now say if there are two queues
> that
> >> are
> >>>>>> getting data from the same fetcher F, and are consumed by two
> >> different
> >>>>>> user threads A and B. If thread A for some reason got slowed / hung
> >>>>>> consuming data from queue 1, then queue 1 will eventually get full,
> >> and F
> >>>>>> trying to put more data to it will be blocked. Since F is parked on
> >>>>> trying
> >>>>>> to put data to queue 1, queue 2 will not get more data from it, and
> >>>>> thread
> >>>>>> B may hence gets starved. Does that make sense now?
> >>>>>>
> >>>>>
> >>>>> Yes, that makes sense. That is the scenario where one thread of a
> >> consumer
> >>>>> can cause a backup in the queue, which would cause other threads to
> not
> >>>>> receive data.
> >>>>>
> >>>>> What about the situation I described, where a thread consumes a queue
> >> that
> >>>>> is supposed to be filled with messages from multiple partitions? If
> >>>>> partition A has no messages and partitions B and C do, how will the
> >> fetcher
> >>>>> behave? Will the processing thread receive messages from partitions B
> >> and C?
> >>>>>
> >>>>> Thanks,
> >>>>> -James
> >>>>>
> >>>>>
> >>>>>> Guozhang
> >>>>>>
> >>>>>> On Tue, Mar 10, 2015 at 5:15 PM, James Cheng <jch...@tivo.com
> <javascript:;>> wrote:
> >>>>>>
> >>>>>>> Hi,
> >>>>>>>
> >>>>>>> Sorry to bring up this old thread, but my question is about this
> >> exact
> >>>>>>> thing:
> >>>>>>>
> >>>>>>> Guozhang, you said:
> >>>>>>>> A more concrete example: say you have topic AC: 3 partitions,
> topic
> >>>>> BC: 6
> >>>>>>>> partitions.
> >>>>>>>>
> >>>>>>>> With createMessageStreams("AC" => 3, "BC" => 2) a total of 5
> threads
> >>>>> will
> >>>>>>>> be created, and consuming AC-1,AC-2,AC-3,BC-1/2/3,BC-4/5/6
> >>>>> respectively;
> >>>>>>>>
> >>>>>>>> With createMessageStreamsByFilter("*C" => 3) a total of 3 threads
> >> will
> >>>>> be
> >>>>>>>> created, and consuming AC-1/BC-1/BC-2, AC-2/BC-3/BC-4,
> >> AC-3/BC-5/BC-6
> >>>>>>>> respectively.
> >>>>>>>
> >>>>>>>
> >>>>>>> You said that in the createMessageStreamsByFilter case, if topic AC
> >> had
> >>>>> no
> >>>>>>> messages in it and consumer.timeout.ms = -1, then the 3 threads
> >> might
> >>>>> all
> >>>>>>> be blocked waiting for data to arrive from topic AC, and so
> messages
> >>>>> from
> >>>>>>> BC would not be processed.
> >>>>>>>
> >>>>>>> createMessageStreamsByFilter("*C" => 1) (single stream) would have
> >> the
> >>>>>>> same problem but just worse. Behind the scenes, is there a single
> >> thread
> >>>>>>> that is consuming (round-robin?) messages from the different
> >> partitions
> >>>>> and
> >>>>>>> inserting them all into a single queue for the application code to
> >>>>> process?
> >>>>>>> And that is why a single partition with no messages with block the
> >> other
> >>>>>>> messages from getting through?
> >>>>>>>
> >>>>>>> What about createMessageStreams("AC" => 1)? That creates a single
> >> stream
> >>>>>>> that contains messages from multiple partitions, which might be on
> >>>>>>> different brokers. Does that also suffer the same problem, where if
> >> one
> >>>>>>> partition has no messages, that the application would not receive
> >>>>> messages
> >>>>>>> from the other paritions?
> >>>>>>>
> >>>>>>> Thanks,
> >>>>>>> -James
> >>>>>>>
> >>>>>>>
> >>>>>>> On Feb 11, 2015, at 8:13 AM, Guozhang Wang <wangg...@gmail.com
> <javascript:;>>
> >> wrote:
> >>>>>>>
> >>>>>>>> The new consumer will be released in 0.9, which is targeted for
> end
> >> of
> >>>>>>> this
> >>>>>>>> quarter.
> >>>>>>>>
> >>>>>>>> On Tue, Feb 10, 2015 at 7:11 PM, tao xiao <xiaotao...@gmail.com
> <javascript:;>>
> >>>>> wrote:
> >>>>>>>>
> >>>>>>>>> Do you know when the new consumer API will be publicly available?
> >>>>>>>>>
> >>>>>>>>> On Wed, Feb 11, 2015 at 10:43 AM, Guozhang Wang <
> >> wangg...@gmail.com <javascript:;>>
> >>>>>>>>> wrote:
> >>>>>>>>>
> >>>>>>>>>> Yes, it can get stuck. For example, AC and BC are processed by
> two
> >>>>>>>>>> different processes and AC processors gets stuck, hence AC
> >> messages
> >>>>>>> will
> >>>>>>>>>> fill up in the consumer's buffer and eventually prevents the
> >> fetcher
> >>>>>>>>> thread
> >>>>>>>>>> to put more data into it; the fetcher thread will be blocked on
> >> that
> >>>>>>> and
> >>>>>>>>>> not be able to fetch BC.
> >>>>>>>>>>
> >>>>>>>>>> This issue has been addressed in the new consumer client, which
> is
> >>>>>>>>>> single-threaded with non-blocking APIs.
> >>>>>>>>>>
> >>>>>>>>>> Guozhang
> >>>>>>>>>>
> >>>>>>>>>> On Tue, Feb 10, 2015 at 6:24 PM, tao xiao <xiaotao...@gmail.com
> <javascript:;>>
> >>>>>>> wrote:
> >>>>>>>>>>
> >>>>>>>>>>> Thank you Guozhang for your detailed explanation. In your
> example
> >>>>>>>>>>> createMessageStreamsByFilter("*C" => 3)  since threads are
> shared
> >>>>>>> among
> >>>>>>>>>>> topics there may be situation where all 3 threads threads get
> >> stuck
> >>>>>>>>> with
> >>>>>>>>>>> topic AC e.g. topic is empty which will be holding the
> connecting
> >>>>>>>>> threads
> >>>>>>>>>>> (setting consumer.timeout.ms=-1) hence there is no thread to
> >> serve
> >>>>>>>>> topic
> >>>>>>>>>>> BC. do you think this situation will happen?
> >>>>>>>>>>>
> >>>>>>>>>>> On Wed, Feb 11, 2015 at 2:15 AM, Guozhang Wang <
> >> wangg...@gmail.com <javascript:;>>
> >>>>>>>>>> wrote:
> >>>>>>>>>>>
> >>>>>>>>>>>> I was not clear before .. for createMessageStreamsByFilter
> each
> >>>>>>>>> matched
> >>>>>>>>>>>> topic will have num-threads, but shared: i.e. there will be
> >> totally
> >>>>>>>>>>>> num-threads created, but each thread will be responsible for
> >>>>> fetching
> >>>>>>>>>> all
> >>>>>>>>>>>> matched topics.
> >>>>>>>>>>>>
> >>>>>>>>>>>> A more concrete example: say you have topic AC: 3 partitions,
> >> topic
> >>>>>>>>>> BC: 6
> >>>>>>>>>>>> partitions.
> >>>>>>>>>>>>
> >>>>>>>>>>>> With createMessageStreams("AC" => 3, "BC" => 2) a total of 5
> >>>>> threads
> >>>>>>>>>> will
> >>>>>>>>>>>> be created, and consuming AC-1,AC-2,AC-3,BC-1/2/3,BC-4/5/6
> >>>>>>>>>> respectively;
> >>>>>>>>>>>>
> >>>>>>>>>>>> With createMessageStreamsByFilter("*C" => 3) a total of 3
> >> threads
> >>>>>>>>> will
> >>>>>>>>>> be
> >>>>>>>>>>>> created, and consuming AC-1/BC-1/BC-2, AC-2/BC-3/BC-4,
> >>>>> AC-3/BC-5/BC-6
> >>>>>>>>>>>> respectively.
> >>>>>>>>>>>>
> >>>>>>>>>>>> Guozhang
> >>>>>>>>>>>>
> >>>>>>>>>>>> On Tue, Feb 10, 2015 at 8:37 AM, tao xiao <
> xiaotao...@gmail.com <javascript:;>
> >>>
> >>>>>>>>>> wrote:
> >>>>>>>>>>>>
> >>>>>>>>>>>>> Guozhang,
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Do you mean that each regex matched topic owns number of
> >> threads
> >>>>>>>>> that
> >>>>>>>>>>> get
> >>>>>>>>>>>>> passed in to createMessageStreamsByFilter ? For example in
> >> below
> >>>>>>>>> code
> >>>>>>>>>>> If
> >>>>>>>>>>>> I
> >>>>>>>>>>>>> have 3 matched topics each of which has 2 partitions then I
> >> should
> >>>>>>>>>> have
> >>>>>>>>>>>> 3 *
> >>>>>>>>>>>>> 2 = 6 threads in total with each topic owning 2 threads.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> TopicFilter filter = new Whitelist(".*");
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> int threadTotal = 2;
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> List<KafkaStream<byte[], byte[]>> streams = connector
> >>>>>>>>>>>>> .createMessageStreamsByFilter(filter, threadTotal);
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> But what I observed from the log is different
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 -
> >> Consumer
> >>>>>>>>>>>>> test1234dd5_localhost-1423585444070-82f23758 rebalancing the
> >>>>>>>>>> following
> >>>>>>>>>>>>> partitions: ArrayBuffer(0, 1) for topic kafkatopic-5 with
> >>>>>>>>> consumers:
> >>>>>>>>>>>>> List(test1234dd5_localhost-1423585444070-82f23758-0,
> >>>>>>>>>>>>> test1234dd5_localhost-1423585444070-82f23758-1)
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 -
> >>>>>>>>>>>>> test1234dd5_localhost-1423585444070-82f23758-1 attempting to
> >> claim
> >>>>>>>>>>>>> partition 1
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 -
> >>>>>>>>>>>>> test1234dd5_localhost-1423585444070-82f23758-0 attempting to
> >> claim
> >>>>>>>>>>>>> partition 0
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 -
> >> Consumer
> >>>>>>>>>>>>> test1234dd5_localhost-1423585444070-82f23758 rebalancing the
> >>>>>>>>>> following
> >>>>>>>>>>>>> partitions: ArrayBuffer(0) for topic kafkatopic-1 with
> >> consumers:
> >>>>>>>>>>>>> List(test1234dd5_localhost-1423585444070-82f23758-0,
> >>>>>>>>>>>>> test1234dd5_localhost-1423585444070-82f23758-1)
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> 2015-02-11 00:24:04 WARN  kafka.utils.Logging$class:83 - No
> >> broker
> >>>>>>>>>>>>> partitions consumed by consumer thread
> >>>>>>>>>>>>> test1234dd5_localhost-1423585444070-82f23758-1 for topic
> >>>>>>>>> kafkatopic-1
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 -
> >>>>>>>>>>>>> test1234dd5_localhost-1423585444070-82f23758-0 attempting to
> >> claim
> >>>>>>>>>>>>> partition 0
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 -
> >> Consumer
> >>>>>>>>>>>>> test1234dd5_localhost-1423585444070-82f23758 rebalancing the
> >>>>>>>>>> following
> >>>>>>>>>>>>> partitions: ArrayBuffer(0, 1) for topic kafkatopic-4 with
> >>>>>>>>> consumers:
> >>>>>>>>>>>>> List(test1234dd5_localhost-1423585444070-82f23758-0,
> >>>>>>>>>>>>> test1234dd5_localhost-1423585444070-82f23758-1)
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 -
> >>>>>>>>>>>>> test1234dd5_localhost-1423585444070-82f23758-1 attempting to
> >> claim
> >>>>>>>>>>>>> partition 1
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 -
> >>>>>>>>>>>>> test1234dd5_localhost-1423585444070-82f23758-0 attempting to
> >> claim
> >>>>>>>>>>>>> partition 0
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> As you can see from the log there are only 2 threads created
> >> and
> >>>>>>>>>> shared
> >>>>>>>>>>>>> among 3 topics. With this setting I think the parallelism is
> >>>>>>>>> degraded
> >>>>>>>>>>>> and a
> >>>>>>>>>>>>> slow topic may impact other topics' consumption performance.
> >> Any
> >>>>>>>>>>>> thoughts?
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> On Wed, Feb 11, 2015 at 12:00 AM, Guozhang Wang <
> >>>>>>>>> wangg...@gmail.com <javascript:;>>
> >>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>> createMessageStreams is used for consuming from specific
> >>>>>>>>> topic(s),
> >>>>>>>>>>>> where
> >>>>>>>>>>>>>> you can put a map of [topic-name, num-threads] as its input
> >>>>>>>>>>> parameters;
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> createMessageStreamsByFilter is used for consuming from
> >> wildcard
> >>>>>>>>>>>> topics,
> >>>>>>>>>>>>>> where you can put a (regex, num-threads) as its input
> >> parameters,
> >>>>>>>>>> and
> >>>>>>>>>>>> for
> >>>>>>>>>>>>>> each regex matched topic num-threads will be created.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> The difference between these two are not really for
> >> throughput /
> >>>>>>>>>>>> latency,
> >>>>>>>>>>>>>> but rather consumption semantics.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Guozhang
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> On Tue, Feb 10, 2015 at 3:02 AM, tao xiao <
> >> xiaotao...@gmail.com <javascript:;>>
> >>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Hi team,
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> I am comparing the differences between
> >>>>>>>>>>>>>>> ConsumerConnector.createMessageStreams
> >>>>>>>>>>>>>>> and ConsumerConnector.createMessageStreamsByFilter. My
> >>>>>>>>>>> understanding
> >>>>>>>>>>>> is
> >>>>>>>>>>>>>>> that createMessageStreams creates x number of threads (x is
> >> the
> >>>>>>>>>>>> number
> >>>>>>>>>>>>> of
> >>>>>>>>>>>>>>> threads passed in to the method) dedicated to the specified
> >>>>>>>>> topic
> >>>>>>>>>>>>>>> while createMessageStreamsByFilter creates x number of
> >> threads
> >>>>>>>>>>> shared
> >>>>>>>>>>>>> by
> >>>>>>>>>>>>>>> topics specified by TopicFilter. Is it correct?
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> If this is the case I assume createMessageStreams is the
> >>>>>>>>>> preferred
> >>>>>>>>>>>> way
> >>>>>>>>>>>>> to
> >>>>>>>>>>>>>>> create streams for each topic if I have high throughput and
> >> low
> >>>>>>>>>>>> latency
> >>>>>>>>>>>>>>> demands. is my assumption correct?
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> --
> >>>>>>>>>>>>>>> Regards,
> >>>>>>>>>>>>>>> Tao
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> --
> >>>>>>>>>>>>>> -- Guozhang
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> --
> >>>>>>>>>>>>> Regards,
> >>>>>>>>>>>>> Tao
> >>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>> --
> >>>>>>>>>>>> -- Guozhang
> >>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> --
> >>>>>>>>>>> Regards,
> >>>>>>>>>>> Tao
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> --
> >>>>>>>>>> -- Guozhang
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> --
> >>>>>>>>> Regards,
> >>>>>>>>> Tao
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> --
> >>>>>>>> -- Guozhang
> >>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>>
> >>>>>> --
> >>>>>> -- Guozhang
> >>>>>
> >>>>>
> >>>>
> >>>>
> >>>> --
> >>>> Regards,
> >>>> Tao
> >>>
> >>> ____________________________________________________________
> >>> What's your flood risk?
> >>> Find flood maps, interactive tools, FAQs, and agents in your area.
> >>>
> http://thirdpartyoffers.netzero.net/TGL3255/5501df28707b05f280272mp12duc
> >> <
> http://thirdpartyoffers.netzero.net/TGL3255/5501df28707b05f280272mp12duc>
> >>
> >
> >
> >
> > --
> > Regards,
> > Tao
> > ____________________________________________________________
> > Want to place your ad here?
> > Advertise on United Online
> > http://thirdpartyoffers.netzero.net/TGL3255/55028688c3a996884bccmp02duc
>
>

-- 
Regards,
Tao

Reply via email to