On Mar 11, 2015, at 9:12 AM, Guozhang Wang <wangg...@gmail.com> wrote:

> Hi James,
> 
> What I meant before is that a single fetcher may be responsible for putting
> fetched data to multiple queues according to the construction of the
> streams setup, where each queue may be consumed by a different thread. And
> the queues are actually bounded. Now say if there are two queues that are
> getting data from the same fetcher F, and are consumed by two different
> user threads A and B. If thread A for some reason got slowed / hung
> consuming data from queue 1, then queue 1 will eventually get full, and F
> trying to put more data to it will be blocked. Since F is parked on trying
> to put data to queue 1, queue 2 will not get more data from it, and thread
> B may hence gets starved. Does that make sense now?
> 

Yes, that makes sense. That is the scenario where one thread of a consumer can 
cause a backup in the queue, which would cause other threads to not receive 
data.

What about the situation I described, where a thread consumes a queue that is 
supposed to be filled with messages from multiple partitions? If partition A 
has no messages and partitions B and C do, how will the fetcher behave? Will 
the processing thread receive messages from partitions B and C?

Thanks,
-James


> Guozhang
> 
> On Tue, Mar 10, 2015 at 5:15 PM, James Cheng <jch...@tivo.com> wrote:
> 
>> Hi,
>> 
>> Sorry to bring up this old thread, but my question is about this exact
>> thing:
>> 
>> Guozhang, you said:
>>> A more concrete example: say you have topic AC: 3 partitions, topic BC: 6
>>> partitions.
>>> 
>>> With createMessageStreams("AC" => 3, "BC" => 2) a total of 5 threads will
>>> be created, and consuming AC-1,AC-2,AC-3,BC-1/2/3,BC-4/5/6 respectively;
>>> 
>>> With createMessageStreamsByFilter("*C" => 3) a total of 3 threads will be
>>> created, and consuming AC-1/BC-1/BC-2, AC-2/BC-3/BC-4, AC-3/BC-5/BC-6
>>> respectively.
>> 
>> 
>> You said that in the createMessageStreamsByFilter case, if topic AC had no
>> messages in it and consumer.timeout.ms = -1, then the 3 threads might all
>> be blocked waiting for data to arrive from topic AC, and so messages from
>> BC would not be processed.
>> 
>> createMessageStreamsByFilter("*C" => 1) (single stream) would have the
>> same problem but just worse. Behind the scenes, is there a single thread
>> that is consuming (round-robin?) messages from the different partitions and
>> inserting them all into a single queue for the application code to process?
>> And that is why a single partition with no messages with block the other
>> messages from getting through?
>> 
>> What about createMessageStreams("AC" => 1)? That creates a single stream
>> that contains messages from multiple partitions, which might be on
>> different brokers. Does that also suffer the same problem, where if one
>> partition has no messages, that the application would not receive messages
>> from the other paritions?
>> 
>> Thanks,
>> -James
>> 
>> 
>> On Feb 11, 2015, at 8:13 AM, Guozhang Wang <wangg...@gmail.com> wrote:
>> 
>>> The new consumer will be released in 0.9, which is targeted for end of
>> this
>>> quarter.
>>> 
>>> On Tue, Feb 10, 2015 at 7:11 PM, tao xiao <xiaotao...@gmail.com> wrote:
>>> 
>>>> Do you know when the new consumer API will be publicly available?
>>>> 
>>>> On Wed, Feb 11, 2015 at 10:43 AM, Guozhang Wang <wangg...@gmail.com>
>>>> wrote:
>>>> 
>>>>> Yes, it can get stuck. For example, AC and BC are processed by two
>>>>> different processes and AC processors gets stuck, hence AC messages
>> will
>>>>> fill up in the consumer's buffer and eventually prevents the fetcher
>>>> thread
>>>>> to put more data into it; the fetcher thread will be blocked on that
>> and
>>>>> not be able to fetch BC.
>>>>> 
>>>>> This issue has been addressed in the new consumer client, which is
>>>>> single-threaded with non-blocking APIs.
>>>>> 
>>>>> Guozhang
>>>>> 
>>>>> On Tue, Feb 10, 2015 at 6:24 PM, tao xiao <xiaotao...@gmail.com>
>> wrote:
>>>>> 
>>>>>> Thank you Guozhang for your detailed explanation. In your example
>>>>>> createMessageStreamsByFilter("*C" => 3)  since threads are shared
>> among
>>>>>> topics there may be situation where all 3 threads threads get stuck
>>>> with
>>>>>> topic AC e.g. topic is empty which will be holding the connecting
>>>> threads
>>>>>> (setting consumer.timeout.ms=-1) hence there is no thread to serve
>>>> topic
>>>>>> BC. do you think this situation will happen?
>>>>>> 
>>>>>> On Wed, Feb 11, 2015 at 2:15 AM, Guozhang Wang <wangg...@gmail.com>
>>>>> wrote:
>>>>>> 
>>>>>>> I was not clear before .. for createMessageStreamsByFilter each
>>>> matched
>>>>>>> topic will have num-threads, but shared: i.e. there will be totally
>>>>>>> num-threads created, but each thread will be responsible for fetching
>>>>> all
>>>>>>> matched topics.
>>>>>>> 
>>>>>>> A more concrete example: say you have topic AC: 3 partitions, topic
>>>>> BC: 6
>>>>>>> partitions.
>>>>>>> 
>>>>>>> With createMessageStreams("AC" => 3, "BC" => 2) a total of 5 threads
>>>>> will
>>>>>>> be created, and consuming AC-1,AC-2,AC-3,BC-1/2/3,BC-4/5/6
>>>>> respectively;
>>>>>>> 
>>>>>>> With createMessageStreamsByFilter("*C" => 3) a total of 3 threads
>>>> will
>>>>> be
>>>>>>> created, and consuming AC-1/BC-1/BC-2, AC-2/BC-3/BC-4, AC-3/BC-5/BC-6
>>>>>>> respectively.
>>>>>>> 
>>>>>>> Guozhang
>>>>>>> 
>>>>>>> On Tue, Feb 10, 2015 at 8:37 AM, tao xiao <xiaotao...@gmail.com>
>>>>> wrote:
>>>>>>> 
>>>>>>>> Guozhang,
>>>>>>>> 
>>>>>>>> Do you mean that each regex matched topic owns number of threads
>>>> that
>>>>>> get
>>>>>>>> passed in to createMessageStreamsByFilter ? For example in below
>>>> code
>>>>>> If
>>>>>>> I
>>>>>>>> have 3 matched topics each of which has 2 partitions then I should
>>>>> have
>>>>>>> 3 *
>>>>>>>> 2 = 6 threads in total with each topic owning 2 threads.
>>>>>>>> 
>>>>>>>> TopicFilter filter = new Whitelist(".*");
>>>>>>>> 
>>>>>>>> int threadTotal = 2;
>>>>>>>> 
>>>>>>>> List<KafkaStream<byte[], byte[]>> streams = connector
>>>>>>>> .createMessageStreamsByFilter(filter, threadTotal);
>>>>>>>> 
>>>>>>>> 
>>>>>>>> But what I observed from the log is different
>>>>>>>> 
>>>>>>>> 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 - Consumer
>>>>>>>> test1234dd5_localhost-1423585444070-82f23758 rebalancing the
>>>>> following
>>>>>>>> partitions: ArrayBuffer(0, 1) for topic kafkatopic-5 with
>>>> consumers:
>>>>>>>> List(test1234dd5_localhost-1423585444070-82f23758-0,
>>>>>>>> test1234dd5_localhost-1423585444070-82f23758-1)
>>>>>>>> 
>>>>>>>> 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 -
>>>>>>>> test1234dd5_localhost-1423585444070-82f23758-1 attempting to claim
>>>>>>>> partition 1
>>>>>>>> 
>>>>>>>> 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 -
>>>>>>>> test1234dd5_localhost-1423585444070-82f23758-0 attempting to claim
>>>>>>>> partition 0
>>>>>>>> 
>>>>>>>> 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 - Consumer
>>>>>>>> test1234dd5_localhost-1423585444070-82f23758 rebalancing the
>>>>> following
>>>>>>>> partitions: ArrayBuffer(0) for topic kafkatopic-1 with consumers:
>>>>>>>> List(test1234dd5_localhost-1423585444070-82f23758-0,
>>>>>>>> test1234dd5_localhost-1423585444070-82f23758-1)
>>>>>>>> 
>>>>>>>> 2015-02-11 00:24:04 WARN  kafka.utils.Logging$class:83 - No broker
>>>>>>>> partitions consumed by consumer thread
>>>>>>>> test1234dd5_localhost-1423585444070-82f23758-1 for topic
>>>> kafkatopic-1
>>>>>>>> 
>>>>>>>> 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 -
>>>>>>>> test1234dd5_localhost-1423585444070-82f23758-0 attempting to claim
>>>>>>>> partition 0
>>>>>>>> 
>>>>>>>> 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 - Consumer
>>>>>>>> test1234dd5_localhost-1423585444070-82f23758 rebalancing the
>>>>> following
>>>>>>>> partitions: ArrayBuffer(0, 1) for topic kafkatopic-4 with
>>>> consumers:
>>>>>>>> List(test1234dd5_localhost-1423585444070-82f23758-0,
>>>>>>>> test1234dd5_localhost-1423585444070-82f23758-1)
>>>>>>>> 
>>>>>>>> 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 -
>>>>>>>> test1234dd5_localhost-1423585444070-82f23758-1 attempting to claim
>>>>>>>> partition 1
>>>>>>>> 
>>>>>>>> 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 -
>>>>>>>> test1234dd5_localhost-1423585444070-82f23758-0 attempting to claim
>>>>>>>> partition 0
>>>>>>>> 
>>>>>>>> 
>>>>>>>> As you can see from the log there are only 2 threads created and
>>>>> shared
>>>>>>>> among 3 topics. With this setting I think the parallelism is
>>>> degraded
>>>>>>> and a
>>>>>>>> slow topic may impact other topics' consumption performance. Any
>>>>>>> thoughts?
>>>>>>>> 
>>>>>>>> On Wed, Feb 11, 2015 at 12:00 AM, Guozhang Wang <
>>>> wangg...@gmail.com>
>>>>>>>> wrote:
>>>>>>>> 
>>>>>>>>> createMessageStreams is used for consuming from specific
>>>> topic(s),
>>>>>>> where
>>>>>>>>> you can put a map of [topic-name, num-threads] as its input
>>>>>> parameters;
>>>>>>>>> 
>>>>>>>>> createMessageStreamsByFilter is used for consuming from wildcard
>>>>>>> topics,
>>>>>>>>> where you can put a (regex, num-threads) as its input parameters,
>>>>> and
>>>>>>> for
>>>>>>>>> each regex matched topic num-threads will be created.
>>>>>>>>> 
>>>>>>>>> The difference between these two are not really for throughput /
>>>>>>> latency,
>>>>>>>>> but rather consumption semantics.
>>>>>>>>> 
>>>>>>>>> Guozhang
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> On Tue, Feb 10, 2015 at 3:02 AM, tao xiao <xiaotao...@gmail.com>
>>>>>>> wrote:
>>>>>>>>> 
>>>>>>>>>> Hi team,
>>>>>>>>>> 
>>>>>>>>>> I am comparing the differences between
>>>>>>>>>> ConsumerConnector.createMessageStreams
>>>>>>>>>> and ConsumerConnector.createMessageStreamsByFilter. My
>>>>>> understanding
>>>>>>> is
>>>>>>>>>> that createMessageStreams creates x number of threads (x is the
>>>>>>> number
>>>>>>>> of
>>>>>>>>>> threads passed in to the method) dedicated to the specified
>>>> topic
>>>>>>>>>> while createMessageStreamsByFilter creates x number of threads
>>>>>> shared
>>>>>>>> by
>>>>>>>>>> topics specified by TopicFilter. Is it correct?
>>>>>>>>>> 
>>>>>>>>>> If this is the case I assume createMessageStreams is the
>>>>> preferred
>>>>>>> way
>>>>>>>> to
>>>>>>>>>> create streams for each topic if I have high throughput and low
>>>>>>> latency
>>>>>>>>>> demands. is my assumption correct?
>>>>>>>>>> 
>>>>>>>>>> --
>>>>>>>>>> Regards,
>>>>>>>>>> Tao
>>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> --
>>>>>>>>> -- Guozhang
>>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>> --
>>>>>>>> Regards,
>>>>>>>> Tao
>>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> --
>>>>>>> -- Guozhang
>>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> --
>>>>>> Regards,
>>>>>> Tao
>>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>>> --
>>>>> -- Guozhang
>>>>> 
>>>> 
>>>> 
>>>> 
>>>> --
>>>> Regards,
>>>> Tao
>>>> 
>>> 
>>> 
>>> 
>>> --
>>> -- Guozhang
>> 
>> 
> 
> 
> -- 
> -- Guozhang

Reply via email to