On Mar 11, 2015, at 9:12 AM, Guozhang Wang <wangg...@gmail.com> wrote:
> Hi James, > > What I meant before is that a single fetcher may be responsible for putting > fetched data to multiple queues according to the construction of the > streams setup, where each queue may be consumed by a different thread. And > the queues are actually bounded. Now say if there are two queues that are > getting data from the same fetcher F, and are consumed by two different > user threads A and B. If thread A for some reason got slowed / hung > consuming data from queue 1, then queue 1 will eventually get full, and F > trying to put more data to it will be blocked. Since F is parked on trying > to put data to queue 1, queue 2 will not get more data from it, and thread > B may hence gets starved. Does that make sense now? > Yes, that makes sense. That is the scenario where one thread of a consumer can cause a backup in the queue, which would cause other threads to not receive data. What about the situation I described, where a thread consumes a queue that is supposed to be filled with messages from multiple partitions? If partition A has no messages and partitions B and C do, how will the fetcher behave? Will the processing thread receive messages from partitions B and C? Thanks, -James > Guozhang > > On Tue, Mar 10, 2015 at 5:15 PM, James Cheng <jch...@tivo.com> wrote: > >> Hi, >> >> Sorry to bring up this old thread, but my question is about this exact >> thing: >> >> Guozhang, you said: >>> A more concrete example: say you have topic AC: 3 partitions, topic BC: 6 >>> partitions. >>> >>> With createMessageStreams("AC" => 3, "BC" => 2) a total of 5 threads will >>> be created, and consuming AC-1,AC-2,AC-3,BC-1/2/3,BC-4/5/6 respectively; >>> >>> With createMessageStreamsByFilter("*C" => 3) a total of 3 threads will be >>> created, and consuming AC-1/BC-1/BC-2, AC-2/BC-3/BC-4, AC-3/BC-5/BC-6 >>> respectively. >> >> >> You said that in the createMessageStreamsByFilter case, if topic AC had no >> messages in it and consumer.timeout.ms = -1, then the 3 threads might all >> be blocked waiting for data to arrive from topic AC, and so messages from >> BC would not be processed. >> >> createMessageStreamsByFilter("*C" => 1) (single stream) would have the >> same problem but just worse. Behind the scenes, is there a single thread >> that is consuming (round-robin?) messages from the different partitions and >> inserting them all into a single queue for the application code to process? >> And that is why a single partition with no messages with block the other >> messages from getting through? >> >> What about createMessageStreams("AC" => 1)? That creates a single stream >> that contains messages from multiple partitions, which might be on >> different brokers. Does that also suffer the same problem, where if one >> partition has no messages, that the application would not receive messages >> from the other paritions? >> >> Thanks, >> -James >> >> >> On Feb 11, 2015, at 8:13 AM, Guozhang Wang <wangg...@gmail.com> wrote: >> >>> The new consumer will be released in 0.9, which is targeted for end of >> this >>> quarter. >>> >>> On Tue, Feb 10, 2015 at 7:11 PM, tao xiao <xiaotao...@gmail.com> wrote: >>> >>>> Do you know when the new consumer API will be publicly available? >>>> >>>> On Wed, Feb 11, 2015 at 10:43 AM, Guozhang Wang <wangg...@gmail.com> >>>> wrote: >>>> >>>>> Yes, it can get stuck. For example, AC and BC are processed by two >>>>> different processes and AC processors gets stuck, hence AC messages >> will >>>>> fill up in the consumer's buffer and eventually prevents the fetcher >>>> thread >>>>> to put more data into it; the fetcher thread will be blocked on that >> and >>>>> not be able to fetch BC. >>>>> >>>>> This issue has been addressed in the new consumer client, which is >>>>> single-threaded with non-blocking APIs. >>>>> >>>>> Guozhang >>>>> >>>>> On Tue, Feb 10, 2015 at 6:24 PM, tao xiao <xiaotao...@gmail.com> >> wrote: >>>>> >>>>>> Thank you Guozhang for your detailed explanation. In your example >>>>>> createMessageStreamsByFilter("*C" => 3) since threads are shared >> among >>>>>> topics there may be situation where all 3 threads threads get stuck >>>> with >>>>>> topic AC e.g. topic is empty which will be holding the connecting >>>> threads >>>>>> (setting consumer.timeout.ms=-1) hence there is no thread to serve >>>> topic >>>>>> BC. do you think this situation will happen? >>>>>> >>>>>> On Wed, Feb 11, 2015 at 2:15 AM, Guozhang Wang <wangg...@gmail.com> >>>>> wrote: >>>>>> >>>>>>> I was not clear before .. for createMessageStreamsByFilter each >>>> matched >>>>>>> topic will have num-threads, but shared: i.e. there will be totally >>>>>>> num-threads created, but each thread will be responsible for fetching >>>>> all >>>>>>> matched topics. >>>>>>> >>>>>>> A more concrete example: say you have topic AC: 3 partitions, topic >>>>> BC: 6 >>>>>>> partitions. >>>>>>> >>>>>>> With createMessageStreams("AC" => 3, "BC" => 2) a total of 5 threads >>>>> will >>>>>>> be created, and consuming AC-1,AC-2,AC-3,BC-1/2/3,BC-4/5/6 >>>>> respectively; >>>>>>> >>>>>>> With createMessageStreamsByFilter("*C" => 3) a total of 3 threads >>>> will >>>>> be >>>>>>> created, and consuming AC-1/BC-1/BC-2, AC-2/BC-3/BC-4, AC-3/BC-5/BC-6 >>>>>>> respectively. >>>>>>> >>>>>>> Guozhang >>>>>>> >>>>>>> On Tue, Feb 10, 2015 at 8:37 AM, tao xiao <xiaotao...@gmail.com> >>>>> wrote: >>>>>>> >>>>>>>> Guozhang, >>>>>>>> >>>>>>>> Do you mean that each regex matched topic owns number of threads >>>> that >>>>>> get >>>>>>>> passed in to createMessageStreamsByFilter ? For example in below >>>> code >>>>>> If >>>>>>> I >>>>>>>> have 3 matched topics each of which has 2 partitions then I should >>>>> have >>>>>>> 3 * >>>>>>>> 2 = 6 threads in total with each topic owning 2 threads. >>>>>>>> >>>>>>>> TopicFilter filter = new Whitelist(".*"); >>>>>>>> >>>>>>>> int threadTotal = 2; >>>>>>>> >>>>>>>> List<KafkaStream<byte[], byte[]>> streams = connector >>>>>>>> .createMessageStreamsByFilter(filter, threadTotal); >>>>>>>> >>>>>>>> >>>>>>>> But what I observed from the log is different >>>>>>>> >>>>>>>> 2015-02-11 00:24:04 INFO kafka.utils.Logging$class:68 - Consumer >>>>>>>> test1234dd5_localhost-1423585444070-82f23758 rebalancing the >>>>> following >>>>>>>> partitions: ArrayBuffer(0, 1) for topic kafkatopic-5 with >>>> consumers: >>>>>>>> List(test1234dd5_localhost-1423585444070-82f23758-0, >>>>>>>> test1234dd5_localhost-1423585444070-82f23758-1) >>>>>>>> >>>>>>>> 2015-02-11 00:24:04 INFO kafka.utils.Logging$class:68 - >>>>>>>> test1234dd5_localhost-1423585444070-82f23758-1 attempting to claim >>>>>>>> partition 1 >>>>>>>> >>>>>>>> 2015-02-11 00:24:04 INFO kafka.utils.Logging$class:68 - >>>>>>>> test1234dd5_localhost-1423585444070-82f23758-0 attempting to claim >>>>>>>> partition 0 >>>>>>>> >>>>>>>> 2015-02-11 00:24:04 INFO kafka.utils.Logging$class:68 - Consumer >>>>>>>> test1234dd5_localhost-1423585444070-82f23758 rebalancing the >>>>> following >>>>>>>> partitions: ArrayBuffer(0) for topic kafkatopic-1 with consumers: >>>>>>>> List(test1234dd5_localhost-1423585444070-82f23758-0, >>>>>>>> test1234dd5_localhost-1423585444070-82f23758-1) >>>>>>>> >>>>>>>> 2015-02-11 00:24:04 WARN kafka.utils.Logging$class:83 - No broker >>>>>>>> partitions consumed by consumer thread >>>>>>>> test1234dd5_localhost-1423585444070-82f23758-1 for topic >>>> kafkatopic-1 >>>>>>>> >>>>>>>> 2015-02-11 00:24:04 INFO kafka.utils.Logging$class:68 - >>>>>>>> test1234dd5_localhost-1423585444070-82f23758-0 attempting to claim >>>>>>>> partition 0 >>>>>>>> >>>>>>>> 2015-02-11 00:24:04 INFO kafka.utils.Logging$class:68 - Consumer >>>>>>>> test1234dd5_localhost-1423585444070-82f23758 rebalancing the >>>>> following >>>>>>>> partitions: ArrayBuffer(0, 1) for topic kafkatopic-4 with >>>> consumers: >>>>>>>> List(test1234dd5_localhost-1423585444070-82f23758-0, >>>>>>>> test1234dd5_localhost-1423585444070-82f23758-1) >>>>>>>> >>>>>>>> 2015-02-11 00:24:04 INFO kafka.utils.Logging$class:68 - >>>>>>>> test1234dd5_localhost-1423585444070-82f23758-1 attempting to claim >>>>>>>> partition 1 >>>>>>>> >>>>>>>> 2015-02-11 00:24:04 INFO kafka.utils.Logging$class:68 - >>>>>>>> test1234dd5_localhost-1423585444070-82f23758-0 attempting to claim >>>>>>>> partition 0 >>>>>>>> >>>>>>>> >>>>>>>> As you can see from the log there are only 2 threads created and >>>>> shared >>>>>>>> among 3 topics. With this setting I think the parallelism is >>>> degraded >>>>>>> and a >>>>>>>> slow topic may impact other topics' consumption performance. Any >>>>>>> thoughts? >>>>>>>> >>>>>>>> On Wed, Feb 11, 2015 at 12:00 AM, Guozhang Wang < >>>> wangg...@gmail.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> createMessageStreams is used for consuming from specific >>>> topic(s), >>>>>>> where >>>>>>>>> you can put a map of [topic-name, num-threads] as its input >>>>>> parameters; >>>>>>>>> >>>>>>>>> createMessageStreamsByFilter is used for consuming from wildcard >>>>>>> topics, >>>>>>>>> where you can put a (regex, num-threads) as its input parameters, >>>>> and >>>>>>> for >>>>>>>>> each regex matched topic num-threads will be created. >>>>>>>>> >>>>>>>>> The difference between these two are not really for throughput / >>>>>>> latency, >>>>>>>>> but rather consumption semantics. >>>>>>>>> >>>>>>>>> Guozhang >>>>>>>>> >>>>>>>>> >>>>>>>>> On Tue, Feb 10, 2015 at 3:02 AM, tao xiao <xiaotao...@gmail.com> >>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Hi team, >>>>>>>>>> >>>>>>>>>> I am comparing the differences between >>>>>>>>>> ConsumerConnector.createMessageStreams >>>>>>>>>> and ConsumerConnector.createMessageStreamsByFilter. My >>>>>> understanding >>>>>>> is >>>>>>>>>> that createMessageStreams creates x number of threads (x is the >>>>>>> number >>>>>>>> of >>>>>>>>>> threads passed in to the method) dedicated to the specified >>>> topic >>>>>>>>>> while createMessageStreamsByFilter creates x number of threads >>>>>> shared >>>>>>>> by >>>>>>>>>> topics specified by TopicFilter. Is it correct? >>>>>>>>>> >>>>>>>>>> If this is the case I assume createMessageStreams is the >>>>> preferred >>>>>>> way >>>>>>>> to >>>>>>>>>> create streams for each topic if I have high throughput and low >>>>>>> latency >>>>>>>>>> demands. is my assumption correct? >>>>>>>>>> >>>>>>>>>> -- >>>>>>>>>> Regards, >>>>>>>>>> Tao >>>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> -- >>>>>>>>> -- Guozhang >>>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> -- >>>>>>>> Regards, >>>>>>>> Tao >>>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> -- >>>>>>> -- Guozhang >>>>>>> >>>>>> >>>>>> >>>>>> >>>>>> -- >>>>>> Regards, >>>>>> Tao >>>>>> >>>>> >>>>> >>>>> >>>>> -- >>>>> -- Guozhang >>>>> >>>> >>>> >>>> >>>> -- >>>> Regards, >>>> Tao >>>> >>> >>> >>> >>> -- >>> -- Guozhang >> >> > > > -- > -- Guozhang