consumer.timeout.ms only affects how the stream reads data from the
internal chunk queue that is used to buffer received data. The actual data
fetching is done by another fetcher
thread kafka.consumer.ConsumerFetcherThread. The fetcher thread keeps
reading data from broker and put them to the queue and the stream keeps
polling the queue and passes data back to consumer if any.

So for the case like createMessageStreams("AC" => 1) the same stream (
which means the same chunk queue) is shared by multiple partitions of topic
AC. If one of the partition has no data the consumer is still able to read
data from other partitions as the fetcher thread keeps feeding data from
other partitions to the queue.

The only situation where consumer will get stuck is when fetcher thread is
blocked by network like high network latency between consumer and broker or
no data from broker. This is because fetch thread is implemented using
block I/O


On Wed, Mar 11, 2015 at 8:15 AM, James Cheng <jch...@tivo.com> wrote:

> Hi,
>
> Sorry to bring up this old thread, but my question is about this exact
> thing:
>
> Guozhang, you said:
> > A more concrete example: say you have topic AC: 3 partitions, topic BC: 6
> > partitions.
> >
> > With createMessageStreams("AC" => 3, "BC" => 2) a total of 5 threads will
> > be created, and consuming AC-1,AC-2,AC-3,BC-1/2/3,BC-4/5/6 respectively;
> >
> > With createMessageStreamsByFilter("*C" => 3) a total of 3 threads will be
> > created, and consuming AC-1/BC-1/BC-2, AC-2/BC-3/BC-4, AC-3/BC-5/BC-6
> > respectively.
>
>
> You said that in the createMessageStreamsByFilter case, if topic AC had no
> messages in it and consumer.timeout.ms = -1, then the 3 threads might all
> be blocked waiting for data to arrive from topic AC, and so messages from
> BC would not be processed.
>
> createMessageStreamsByFilter("*C" => 1) (single stream) would have the
> same problem but just worse. Behind the scenes, is there a single thread
> that is consuming (round-robin?) messages from the different partitions and
> inserting them all into a single queue for the application code to process?
> And that is why a single partition with no messages with block the other
> messages from getting through?
>
> What about createMessageStreams("AC" => 1)? That creates a single stream
> that contains messages from multiple partitions, which might be on
> different brokers. Does that also suffer the same problem, where if one
> partition has no messages, that the application would not receive messages
> from the other paritions?
>
> Thanks,
> -James
>
>
> On Feb 11, 2015, at 8:13 AM, Guozhang Wang <wangg...@gmail.com> wrote:
>
> > The new consumer will be released in 0.9, which is targeted for end of
> this
> > quarter.
> >
> > On Tue, Feb 10, 2015 at 7:11 PM, tao xiao <xiaotao...@gmail.com> wrote:
> >
> >> Do you know when the new consumer API will be publicly available?
> >>
> >> On Wed, Feb 11, 2015 at 10:43 AM, Guozhang Wang <wangg...@gmail.com>
> >> wrote:
> >>
> >>> Yes, it can get stuck. For example, AC and BC are processed by two
> >>> different processes and AC processors gets stuck, hence AC messages
> will
> >>> fill up in the consumer's buffer and eventually prevents the fetcher
> >> thread
> >>> to put more data into it; the fetcher thread will be blocked on that
> and
> >>> not be able to fetch BC.
> >>>
> >>> This issue has been addressed in the new consumer client, which is
> >>> single-threaded with non-blocking APIs.
> >>>
> >>> Guozhang
> >>>
> >>> On Tue, Feb 10, 2015 at 6:24 PM, tao xiao <xiaotao...@gmail.com>
> wrote:
> >>>
> >>>> Thank you Guozhang for your detailed explanation. In your example
> >>>> createMessageStreamsByFilter("*C" => 3)  since threads are shared
> among
> >>>> topics there may be situation where all 3 threads threads get stuck
> >> with
> >>>> topic AC e.g. topic is empty which will be holding the connecting
> >> threads
> >>>> (setting consumer.timeout.ms=-1) hence there is no thread to serve
> >> topic
> >>>> BC. do you think this situation will happen?
> >>>>
> >>>> On Wed, Feb 11, 2015 at 2:15 AM, Guozhang Wang <wangg...@gmail.com>
> >>> wrote:
> >>>>
> >>>>> I was not clear before .. for createMessageStreamsByFilter each
> >> matched
> >>>>> topic will have num-threads, but shared: i.e. there will be totally
> >>>>> num-threads created, but each thread will be responsible for fetching
> >>> all
> >>>>> matched topics.
> >>>>>
> >>>>> A more concrete example: say you have topic AC: 3 partitions, topic
> >>> BC: 6
> >>>>> partitions.
> >>>>>
> >>>>> With createMessageStreams("AC" => 3, "BC" => 2) a total of 5 threads
> >>> will
> >>>>> be created, and consuming AC-1,AC-2,AC-3,BC-1/2/3,BC-4/5/6
> >>> respectively;
> >>>>>
> >>>>> With createMessageStreamsByFilter("*C" => 3) a total of 3 threads
> >> will
> >>> be
> >>>>> created, and consuming AC-1/BC-1/BC-2, AC-2/BC-3/BC-4, AC-3/BC-5/BC-6
> >>>>> respectively.
> >>>>>
> >>>>> Guozhang
> >>>>>
> >>>>> On Tue, Feb 10, 2015 at 8:37 AM, tao xiao <xiaotao...@gmail.com>
> >>> wrote:
> >>>>>
> >>>>>> Guozhang,
> >>>>>>
> >>>>>> Do you mean that each regex matched topic owns number of threads
> >> that
> >>>> get
> >>>>>> passed in to createMessageStreamsByFilter ? For example in below
> >> code
> >>>> If
> >>>>> I
> >>>>>> have 3 matched topics each of which has 2 partitions then I should
> >>> have
> >>>>> 3 *
> >>>>>> 2 = 6 threads in total with each topic owning 2 threads.
> >>>>>>
> >>>>>> TopicFilter filter = new Whitelist(".*");
> >>>>>>
> >>>>>> int threadTotal = 2;
> >>>>>>
> >>>>>> List<KafkaStream<byte[], byte[]>> streams = connector
> >>>>>> .createMessageStreamsByFilter(filter, threadTotal);
> >>>>>>
> >>>>>>
> >>>>>> But what I observed from the log is different
> >>>>>>
> >>>>>> 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 - Consumer
> >>>>>> test1234dd5_localhost-1423585444070-82f23758 rebalancing the
> >>> following
> >>>>>> partitions: ArrayBuffer(0, 1) for topic kafkatopic-5 with
> >> consumers:
> >>>>>> List(test1234dd5_localhost-1423585444070-82f23758-0,
> >>>>>> test1234dd5_localhost-1423585444070-82f23758-1)
> >>>>>>
> >>>>>> 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 -
> >>>>>> test1234dd5_localhost-1423585444070-82f23758-1 attempting to claim
> >>>>>> partition 1
> >>>>>>
> >>>>>> 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 -
> >>>>>> test1234dd5_localhost-1423585444070-82f23758-0 attempting to claim
> >>>>>> partition 0
> >>>>>>
> >>>>>> 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 - Consumer
> >>>>>> test1234dd5_localhost-1423585444070-82f23758 rebalancing the
> >>> following
> >>>>>> partitions: ArrayBuffer(0) for topic kafkatopic-1 with consumers:
> >>>>>> List(test1234dd5_localhost-1423585444070-82f23758-0,
> >>>>>> test1234dd5_localhost-1423585444070-82f23758-1)
> >>>>>>
> >>>>>> 2015-02-11 00:24:04 WARN  kafka.utils.Logging$class:83 - No broker
> >>>>>> partitions consumed by consumer thread
> >>>>>> test1234dd5_localhost-1423585444070-82f23758-1 for topic
> >> kafkatopic-1
> >>>>>>
> >>>>>> 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 -
> >>>>>> test1234dd5_localhost-1423585444070-82f23758-0 attempting to claim
> >>>>>> partition 0
> >>>>>>
> >>>>>> 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 - Consumer
> >>>>>> test1234dd5_localhost-1423585444070-82f23758 rebalancing the
> >>> following
> >>>>>> partitions: ArrayBuffer(0, 1) for topic kafkatopic-4 with
> >> consumers:
> >>>>>> List(test1234dd5_localhost-1423585444070-82f23758-0,
> >>>>>> test1234dd5_localhost-1423585444070-82f23758-1)
> >>>>>>
> >>>>>> 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 -
> >>>>>> test1234dd5_localhost-1423585444070-82f23758-1 attempting to claim
> >>>>>> partition 1
> >>>>>>
> >>>>>> 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 -
> >>>>>> test1234dd5_localhost-1423585444070-82f23758-0 attempting to claim
> >>>>>> partition 0
> >>>>>>
> >>>>>>
> >>>>>> As you can see from the log there are only 2 threads created and
> >>> shared
> >>>>>> among 3 topics. With this setting I think the parallelism is
> >> degraded
> >>>>> and a
> >>>>>> slow topic may impact other topics' consumption performance. Any
> >>>>> thoughts?
> >>>>>>
> >>>>>> On Wed, Feb 11, 2015 at 12:00 AM, Guozhang Wang <
> >> wangg...@gmail.com>
> >>>>>> wrote:
> >>>>>>
> >>>>>>> createMessageStreams is used for consuming from specific
> >> topic(s),
> >>>>> where
> >>>>>>> you can put a map of [topic-name, num-threads] as its input
> >>>> parameters;
> >>>>>>>
> >>>>>>> createMessageStreamsByFilter is used for consuming from wildcard
> >>>>> topics,
> >>>>>>> where you can put a (regex, num-threads) as its input parameters,
> >>> and
> >>>>> for
> >>>>>>> each regex matched topic num-threads will be created.
> >>>>>>>
> >>>>>>> The difference between these two are not really for throughput /
> >>>>> latency,
> >>>>>>> but rather consumption semantics.
> >>>>>>>
> >>>>>>> Guozhang
> >>>>>>>
> >>>>>>>
> >>>>>>> On Tue, Feb 10, 2015 at 3:02 AM, tao xiao <xiaotao...@gmail.com>
> >>>>> wrote:
> >>>>>>>
> >>>>>>>> Hi team,
> >>>>>>>>
> >>>>>>>> I am comparing the differences between
> >>>>>>>> ConsumerConnector.createMessageStreams
> >>>>>>>> and ConsumerConnector.createMessageStreamsByFilter. My
> >>>> understanding
> >>>>> is
> >>>>>>>> that createMessageStreams creates x number of threads (x is the
> >>>>> number
> >>>>>> of
> >>>>>>>> threads passed in to the method) dedicated to the specified
> >> topic
> >>>>>>>> while createMessageStreamsByFilter creates x number of threads
> >>>> shared
> >>>>>> by
> >>>>>>>> topics specified by TopicFilter. Is it correct?
> >>>>>>>>
> >>>>>>>> If this is the case I assume createMessageStreams is the
> >>> preferred
> >>>>> way
> >>>>>> to
> >>>>>>>> create streams for each topic if I have high throughput and low
> >>>>> latency
> >>>>>>>> demands. is my assumption correct?
> >>>>>>>>
> >>>>>>>> --
> >>>>>>>> Regards,
> >>>>>>>> Tao
> >>>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>> --
> >>>>>>> -- Guozhang
> >>>>>>>
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>> --
> >>>>>> Regards,
> >>>>>> Tao
> >>>>>>
> >>>>>
> >>>>>
> >>>>>
> >>>>> --
> >>>>> -- Guozhang
> >>>>>
> >>>>
> >>>>
> >>>>
> >>>> --
> >>>> Regards,
> >>>> Tao
> >>>>
> >>>
> >>>
> >>>
> >>> --
> >>> -- Guozhang
> >>>
> >>
> >>
> >>
> >> --
> >> Regards,
> >> Tao
> >>
> >
> >
> >
> > --
> > -- Guozhang
>
>


-- 
Regards,
Tao

Reply via email to