Hi James,

What I meant before is that a single fetcher may be responsible for putting
fetched data to multiple queues according to the construction of the
streams setup, where each queue may be consumed by a different thread. And
the queues are actually bounded. Now say if there are two queues that are
getting data from the same fetcher F, and are consumed by two different
user threads A and B. If thread A for some reason got slowed / hung
consuming data from queue 1, then queue 1 will eventually get full, and F
trying to put more data to it will be blocked. Since F is parked on trying
to put data to queue 1, queue 2 will not get more data from it, and thread
B may hence gets starved. Does that make sense now?

Guozhang

On Tue, Mar 10, 2015 at 5:15 PM, James Cheng <jch...@tivo.com> wrote:

> Hi,
>
> Sorry to bring up this old thread, but my question is about this exact
> thing:
>
> Guozhang, you said:
> > A more concrete example: say you have topic AC: 3 partitions, topic BC: 6
> > partitions.
> >
> > With createMessageStreams("AC" => 3, "BC" => 2) a total of 5 threads will
> > be created, and consuming AC-1,AC-2,AC-3,BC-1/2/3,BC-4/5/6 respectively;
> >
> > With createMessageStreamsByFilter("*C" => 3) a total of 3 threads will be
> > created, and consuming AC-1/BC-1/BC-2, AC-2/BC-3/BC-4, AC-3/BC-5/BC-6
> > respectively.
>
>
> You said that in the createMessageStreamsByFilter case, if topic AC had no
> messages in it and consumer.timeout.ms = -1, then the 3 threads might all
> be blocked waiting for data to arrive from topic AC, and so messages from
> BC would not be processed.
>
> createMessageStreamsByFilter("*C" => 1) (single stream) would have the
> same problem but just worse. Behind the scenes, is there a single thread
> that is consuming (round-robin?) messages from the different partitions and
> inserting them all into a single queue for the application code to process?
> And that is why a single partition with no messages with block the other
> messages from getting through?
>
> What about createMessageStreams("AC" => 1)? That creates a single stream
> that contains messages from multiple partitions, which might be on
> different brokers. Does that also suffer the same problem, where if one
> partition has no messages, that the application would not receive messages
> from the other paritions?
>
> Thanks,
> -James
>
>
> On Feb 11, 2015, at 8:13 AM, Guozhang Wang <wangg...@gmail.com> wrote:
>
> > The new consumer will be released in 0.9, which is targeted for end of
> this
> > quarter.
> >
> > On Tue, Feb 10, 2015 at 7:11 PM, tao xiao <xiaotao...@gmail.com> wrote:
> >
> >> Do you know when the new consumer API will be publicly available?
> >>
> >> On Wed, Feb 11, 2015 at 10:43 AM, Guozhang Wang <wangg...@gmail.com>
> >> wrote:
> >>
> >>> Yes, it can get stuck. For example, AC and BC are processed by two
> >>> different processes and AC processors gets stuck, hence AC messages
> will
> >>> fill up in the consumer's buffer and eventually prevents the fetcher
> >> thread
> >>> to put more data into it; the fetcher thread will be blocked on that
> and
> >>> not be able to fetch BC.
> >>>
> >>> This issue has been addressed in the new consumer client, which is
> >>> single-threaded with non-blocking APIs.
> >>>
> >>> Guozhang
> >>>
> >>> On Tue, Feb 10, 2015 at 6:24 PM, tao xiao <xiaotao...@gmail.com>
> wrote:
> >>>
> >>>> Thank you Guozhang for your detailed explanation. In your example
> >>>> createMessageStreamsByFilter("*C" => 3)  since threads are shared
> among
> >>>> topics there may be situation where all 3 threads threads get stuck
> >> with
> >>>> topic AC e.g. topic is empty which will be holding the connecting
> >> threads
> >>>> (setting consumer.timeout.ms=-1) hence there is no thread to serve
> >> topic
> >>>> BC. do you think this situation will happen?
> >>>>
> >>>> On Wed, Feb 11, 2015 at 2:15 AM, Guozhang Wang <wangg...@gmail.com>
> >>> wrote:
> >>>>
> >>>>> I was not clear before .. for createMessageStreamsByFilter each
> >> matched
> >>>>> topic will have num-threads, but shared: i.e. there will be totally
> >>>>> num-threads created, but each thread will be responsible for fetching
> >>> all
> >>>>> matched topics.
> >>>>>
> >>>>> A more concrete example: say you have topic AC: 3 partitions, topic
> >>> BC: 6
> >>>>> partitions.
> >>>>>
> >>>>> With createMessageStreams("AC" => 3, "BC" => 2) a total of 5 threads
> >>> will
> >>>>> be created, and consuming AC-1,AC-2,AC-3,BC-1/2/3,BC-4/5/6
> >>> respectively;
> >>>>>
> >>>>> With createMessageStreamsByFilter("*C" => 3) a total of 3 threads
> >> will
> >>> be
> >>>>> created, and consuming AC-1/BC-1/BC-2, AC-2/BC-3/BC-4, AC-3/BC-5/BC-6
> >>>>> respectively.
> >>>>>
> >>>>> Guozhang
> >>>>>
> >>>>> On Tue, Feb 10, 2015 at 8:37 AM, tao xiao <xiaotao...@gmail.com>
> >>> wrote:
> >>>>>
> >>>>>> Guozhang,
> >>>>>>
> >>>>>> Do you mean that each regex matched topic owns number of threads
> >> that
> >>>> get
> >>>>>> passed in to createMessageStreamsByFilter ? For example in below
> >> code
> >>>> If
> >>>>> I
> >>>>>> have 3 matched topics each of which has 2 partitions then I should
> >>> have
> >>>>> 3 *
> >>>>>> 2 = 6 threads in total with each topic owning 2 threads.
> >>>>>>
> >>>>>> TopicFilter filter = new Whitelist(".*");
> >>>>>>
> >>>>>> int threadTotal = 2;
> >>>>>>
> >>>>>> List<KafkaStream<byte[], byte[]>> streams = connector
> >>>>>> .createMessageStreamsByFilter(filter, threadTotal);
> >>>>>>
> >>>>>>
> >>>>>> But what I observed from the log is different
> >>>>>>
> >>>>>> 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 - Consumer
> >>>>>> test1234dd5_localhost-1423585444070-82f23758 rebalancing the
> >>> following
> >>>>>> partitions: ArrayBuffer(0, 1) for topic kafkatopic-5 with
> >> consumers:
> >>>>>> List(test1234dd5_localhost-1423585444070-82f23758-0,
> >>>>>> test1234dd5_localhost-1423585444070-82f23758-1)
> >>>>>>
> >>>>>> 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 -
> >>>>>> test1234dd5_localhost-1423585444070-82f23758-1 attempting to claim
> >>>>>> partition 1
> >>>>>>
> >>>>>> 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 -
> >>>>>> test1234dd5_localhost-1423585444070-82f23758-0 attempting to claim
> >>>>>> partition 0
> >>>>>>
> >>>>>> 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 - Consumer
> >>>>>> test1234dd5_localhost-1423585444070-82f23758 rebalancing the
> >>> following
> >>>>>> partitions: ArrayBuffer(0) for topic kafkatopic-1 with consumers:
> >>>>>> List(test1234dd5_localhost-1423585444070-82f23758-0,
> >>>>>> test1234dd5_localhost-1423585444070-82f23758-1)
> >>>>>>
> >>>>>> 2015-02-11 00:24:04 WARN  kafka.utils.Logging$class:83 - No broker
> >>>>>> partitions consumed by consumer thread
> >>>>>> test1234dd5_localhost-1423585444070-82f23758-1 for topic
> >> kafkatopic-1
> >>>>>>
> >>>>>> 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 -
> >>>>>> test1234dd5_localhost-1423585444070-82f23758-0 attempting to claim
> >>>>>> partition 0
> >>>>>>
> >>>>>> 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 - Consumer
> >>>>>> test1234dd5_localhost-1423585444070-82f23758 rebalancing the
> >>> following
> >>>>>> partitions: ArrayBuffer(0, 1) for topic kafkatopic-4 with
> >> consumers:
> >>>>>> List(test1234dd5_localhost-1423585444070-82f23758-0,
> >>>>>> test1234dd5_localhost-1423585444070-82f23758-1)
> >>>>>>
> >>>>>> 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 -
> >>>>>> test1234dd5_localhost-1423585444070-82f23758-1 attempting to claim
> >>>>>> partition 1
> >>>>>>
> >>>>>> 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 -
> >>>>>> test1234dd5_localhost-1423585444070-82f23758-0 attempting to claim
> >>>>>> partition 0
> >>>>>>
> >>>>>>
> >>>>>> As you can see from the log there are only 2 threads created and
> >>> shared
> >>>>>> among 3 topics. With this setting I think the parallelism is
> >> degraded
> >>>>> and a
> >>>>>> slow topic may impact other topics' consumption performance. Any
> >>>>> thoughts?
> >>>>>>
> >>>>>> On Wed, Feb 11, 2015 at 12:00 AM, Guozhang Wang <
> >> wangg...@gmail.com>
> >>>>>> wrote:
> >>>>>>
> >>>>>>> createMessageStreams is used for consuming from specific
> >> topic(s),
> >>>>> where
> >>>>>>> you can put a map of [topic-name, num-threads] as its input
> >>>> parameters;
> >>>>>>>
> >>>>>>> createMessageStreamsByFilter is used for consuming from wildcard
> >>>>> topics,
> >>>>>>> where you can put a (regex, num-threads) as its input parameters,
> >>> and
> >>>>> for
> >>>>>>> each regex matched topic num-threads will be created.
> >>>>>>>
> >>>>>>> The difference between these two are not really for throughput /
> >>>>> latency,
> >>>>>>> but rather consumption semantics.
> >>>>>>>
> >>>>>>> Guozhang
> >>>>>>>
> >>>>>>>
> >>>>>>> On Tue, Feb 10, 2015 at 3:02 AM, tao xiao <xiaotao...@gmail.com>
> >>>>> wrote:
> >>>>>>>
> >>>>>>>> Hi team,
> >>>>>>>>
> >>>>>>>> I am comparing the differences between
> >>>>>>>> ConsumerConnector.createMessageStreams
> >>>>>>>> and ConsumerConnector.createMessageStreamsByFilter. My
> >>>> understanding
> >>>>> is
> >>>>>>>> that createMessageStreams creates x number of threads (x is the
> >>>>> number
> >>>>>> of
> >>>>>>>> threads passed in to the method) dedicated to the specified
> >> topic
> >>>>>>>> while createMessageStreamsByFilter creates x number of threads
> >>>> shared
> >>>>>> by
> >>>>>>>> topics specified by TopicFilter. Is it correct?
> >>>>>>>>
> >>>>>>>> If this is the case I assume createMessageStreams is the
> >>> preferred
> >>>>> way
> >>>>>> to
> >>>>>>>> create streams for each topic if I have high throughput and low
> >>>>> latency
> >>>>>>>> demands. is my assumption correct?
> >>>>>>>>
> >>>>>>>> --
> >>>>>>>> Regards,
> >>>>>>>> Tao
> >>>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>> --
> >>>>>>> -- Guozhang
> >>>>>>>
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>> --
> >>>>>> Regards,
> >>>>>> Tao
> >>>>>>
> >>>>>
> >>>>>
> >>>>>
> >>>>> --
> >>>>> -- Guozhang
> >>>>>
> >>>>
> >>>>
> >>>>
> >>>> --
> >>>> Regards,
> >>>> Tao
> >>>>
> >>>
> >>>
> >>>
> >>> --
> >>> -- Guozhang
> >>>
> >>
> >>
> >>
> >> --
> >> Regards,
> >> Tao
> >>
> >
> >
> >
> > --
> > -- Guozhang
>
>


-- 
-- Guozhang

Reply via email to