It's spread out multiple classes...

Good starting point is here:
https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java#L806

It implements the main-loop that polls, addRecordsToTasks() (ie, put the
into buffers), and processes record.

pause/resume is done here:

https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L720

and

https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L362


Not blocking forever makes sense of course. I did not mean in literally.
However, longer poll() blocking times, should allow you to drain the
consumer buffer better.


-Matthias

On 10/21/18 12:54 PM, Zahari Dichev wrote:
> Thanks for your feedback Matthias, Do you think you can point me to the
> part where Kafka streams deals with all of that so I can take a look. Will
> try and see whether your suggested approach works for us before trying to
> argue my point further. Just a mall thing to mention though, blocking on
> the poll forever is not something that I am convinced we would like to do.
> 
> Zahari
> 
> On Sun, Oct 21, 2018 at 10:11 PM Matthias J. Sax <matth...@confluent.io>
> wrote:
> 
>>> You have one consumer that is quite
>>>> slow so lets say it call poll every 5 seconds, while you need to call
>> poll
>>>> every 1 second to issue a heartbeat (these are made up numbers of
>> course).
>>
>> Heartbeats are send by a backgroud thread since 0.10.0.0. And there is a
>> second configuration `max.poll.interval.ms` that you can increase for a
>> slow consumer. Kafka Streams sets it to `Integer.MAX_VALUE` for example.
>>
>> For the remainder, I agree, that using a buffer on top might not a
>> perfect solution. That is, what I meant by:
>>
>>> (not 100% percent guarantee,
>>> depending on fetch size and max.poll.record etc, but with high
>>> probability)
>>
>> However, I believe, that if you configure the consumer accordingly, you
>> can to drain the fetch buffer if you block on `poll()` forever.
>>
>> I have to admit, than I am not familiar with the details of pipelining
>> fetch requests thought. The general idea is, to make sure to drain the
>> internal buffer of the consumer, before you call `pause()`.
>>
>> Curious to to learn why this would not work? How does the pipelining of
>> fetch requests works in detail?
>>
>>
>> -Matthias
>>
>>
>> On 10/19/18 1:36 PM, Zahari Dichev wrote:
>>> Hi there Matthias,
>>>
>>> Very useful thoughts indeed. I have considered the exact same approach
>> but
>>> what worries me a bit is that I do not think that will certainly solve
>> the
>>> issue. Imagine the following situation. You have one consumer that is
>> quite
>>> slow so lets say it call poll every 5 seconds, while you need to call
>> poll
>>> every 1 second to issue a heartbeat (these are made up numbers of
>> course).
>>>
>>> So our consumer calls poll at T0 grabs some data and puts it in a buffer
>>> and calls pause on the topic partition. The fetcher tries to pipeline and
>>> issues a fetch request and at some point the data arrives. At that point
>> we
>>> have some data in the buffer and we can do whatever we want with it, but
>>> there is also some data living within the consumer/fetcher. Approximately
>>> one second later we call poll again because we need to. We are not
>> getting
>>> any data because our partition is paused and this is good because if we
>> got
>>> data we would not know what to do with it as our client is still busy
>>> crunching the data from the first poll. So far so good. What happens
>> though
>>> is that the pre-fetched data gets thrown away upon calling poll as its
>> "no
>>> longer fetchable...".  Maybe I am not fully understanding your suggested
>>> approach, but I dont think it would solve this problem.
>>>
>>> Zahari
>>>
>>> On Fri, Oct 19, 2018 at 8:10 PM Matthias J. Sax <matth...@confluent.io>
>>> wrote:
>>>
>>>> Just my 2 cents.
>>>>
>>>> I am not 100% sure if we would need to change the consumer for this.
>>>> While I still think, that KIP-349 might be valuable, it seems to be
>>>> complementary/orthogonal to the issue discussed here.
>>>>
>>>> For Kafka Streams, we have a related scenario and what Kafka Streams
>>>> does is, to add its own buffer on top of the consumer. Thus, for each
>>>> `poll()` all data is put into this buffer, and now Streams can decide
>>>> which record to process first. For buffers that have data, we can call
>>>> `pause()` without loosing fetched data (not 100% percent guarantee,
>>>> depending on fetch size and max.poll.record etc, but with high
>>>> probability) and if a buffer gets empty we `resume()` partitions.
>>>>
>>>> As Akka Streams builds on top of the consumer it could implement a
>>>> similar pattern. Of course, one cannot use `auto.commit` on the
>>>> consumer, but commits need to be manged manually, (ie, only data that
>>>> was take out of the buffer and actually was processed can be committed).
>>>>
>>>> For the `MessageChooser` idea, I also still think it might be useful,
>>>> but it's unclear to me if this should be a consumer feature or build on
>>>> top of the consumer (maybe it could be a Streams feature, as Streams is
>>>> build on top of the consumer). Thoughts?
>>>>
>>>>
>>>> -Matthias
>>>>
>>>> On 10/18/18 9:27 AM, Jan Filipiak wrote:
>>>>> The idea for you would be that Messagechooser could hang on to the
>>>>> prefetched messages.
>>>>>
>>>>> ccing cmcc...@apache.org
>>>>>
>>>>> @Collin
>>>>> just for you to see that MessageChooser is a powerfull abstraction.
>>>>>
>>>>> :)
>>>>>
>>>>> Best jan
>>>>>
>>>>> On 18.10.2018 13:59, Zahari Dichev wrote:
>>>>>> Jan,
>>>>>>
>>>>>> Quite insightful indeed. I think your propositions are valid.
>>>>>>
>>>>>> Ryanne,
>>>>>>
>>>>>> I understand that consumers are using a pull model... And yes, indeed
>>>> if a
>>>>>> consumer is not ready for more records it surely should not call poll.
>>>>>> Except that it needs to do so periodically in order to indicate that
>> its
>>>>>> live. Forget about the "backpressure", I guess I was wrong with
>> phrasing
>>>>>> this so lets not get caught up on it.
>>>>>>
>>>>>> You say pause/resume can be used to prioritise certain
>> topics/partitions
>>>>>> over others. And indeed this is the case. So instead of thinking about
>>>> it
>>>>>> in terms of backpressure, lets put it in a different way. The Akka
>>>> streams
>>>>>> connector would like to prioritise certain topics over others, using
>>>> once
>>>>>> consumer instance. On top of that, add the detail that the priorities
>>>>>> change quite frequently (which translates to calling pause/resume
>>>>>> frequently). So all that being said, what would be a proper way to
>>>> handle
>>>>>> the situation without throwing the pre-fetched records away when
>> calling
>>>>>> poll on a consumer that happens to have a topic that was recently
>> paused
>>>>>> (and that might be un-paused soon )? Am I the only one who considers
>>>> that
>>>>>> an actual problem with the use os pause/resume ? Not sure how to
>> explain
>>>>>> the situation in a better way..
>>>>>>
>>>>>> Zahari
>>>>>>
>>>>>>
>>>>>> On Thu, Oct 18, 2018 at 9:46 AM Zahari Dichev <zaharidic...@gmail.com
>>>
>>>>>> wrote:
>>>>>>
>>>>>>> Thanks a lot Jan,
>>>>>>>
>>>>>>> I will read it.
>>>>>>>
>>>>>>> Zahari
>>>>>>>
>>>>>>> On Thu, Oct 18, 2018 at 9:31 AM Jan Filipiak <
>> jan.filip...@trivago.com
>>>>>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> especially my suggestions ;)
>>>>>>>>
>>>>>>>> On 18.10.2018 08:30, Jan Filipiak wrote:
>>>>>>>>> Hi Zahari,
>>>>>>>>>
>>>>>>>>> would you be willing to scan through the KIP-349 discussion a
>> little?
>>>>>>>>> I think it has suggestions that could be interesting for you
>>>>>>>>>
>>>>>>>>> Best Jan
>>>>>>>>>
>>>>>>>>> On 16.10.2018 09:29, Zahari Dichev wrote:
>>>>>>>>>> Hi there Kafka developers,
>>>>>>>>>>
>>>>>>>>>> I am currently trying to find a solution to an issue that has been
>>>>>>>>>> manifesting itself in the Akka streams implementation of the Kafka
>>>>>>>>>> connector. When it comes to consuming messages, the implementation
>>>>>>>> relies
>>>>>>>>>> heavily on the fact that we can pause and resume partitions. In
>> some
>>>>>>>>>> situations when a single consumer instance is shared among several
>>>>>>>>>> streams,
>>>>>>>>>> we might end up with frequently pausing and unpausing a set of
>> topic
>>>>>>>>>> partitions, which is the main facility that allows us to implement
>>>> back
>>>>>>>>>> pressure. This however has certain disadvantages, especially when
>>>>>>>>>> there are
>>>>>>>>>> two consumers that differ in terms of processing speed.
>>>>>>>>>>
>>>>>>>>>> To articulate the issue more clearly, imagine that a consumer
>>>> maintains
>>>>>>>>>> assignments for two topic partitions *TP1* and *TP2*. This
>> consumer
>>>> is
>>>>>>>>>> shared by two streams - S1 and S2. So effectively when we have
>>>> demand
>>>>>>>>>> from
>>>>>>>>>> only one of the streams - *S1*, we will pause one of the topic
>>>>>>>> partitions
>>>>>>>>>> *TP2* and call *poll()* on the consumer to only retrieve the
>> records
>>>>>>>> for
>>>>>>>>>> the demanded topic partition - *TP1*. The result of that is all
>> the
>>>>>>>>>> records
>>>>>>>>>> that have been prefetched for *TP2* are now thrown away by the
>>>> fetcher
>>>>>>>>>> ("*Not
>>>>>>>>>> returning fetched records for assigned partition TP2 since it is
>> no
>>>>>>>>>> longer
>>>>>>>>>> fetchable"*). If we extrapolate that to multiple streams sharing
>> the
>>>>>>>> same
>>>>>>>>>> consumer, we might quickly end up in a situation where we throw
>>>>>>>>>> prefetched
>>>>>>>>>> data quite often. This does not seem like the most efficient
>>>> approach
>>>>>>>> and
>>>>>>>>>> in fact produces quite a lot of overlapping fetch requests as
>>>>>>>> illustrated
>>>>>>>>>> in the following issue:
>>>>>>>>>>
>>>>>>>>>> https://github.com/akka/alpakka-kafka/issues/549
>>>>>>>>>>
>>>>>>>>>> I am writing this email to get some initial opinion on a KIP I was
>>>>>>>>>> thinking
>>>>>>>>>> about. What if we give the clients of the Consumer API a bit more
>>>>>>>> control
>>>>>>>>>> of what to do with this prefetched data. Two options I am
>> wondering
>>>>>>>>>> about:
>>>>>>>>>>
>>>>>>>>>> 1. Introduce a configuration setting, such as*
>>>>>>>>>> "return-prefetched-data-for-paused-topic-partitions = false"*
>> (have
>>>> to
>>>>>>>>>> think of a better name), which when set to true will return what
>> is
>>>>>>>>>> prefetched instead of throwing it away on calling *poll()*. Since
>>>> this
>>>>>>>> is
>>>>>>>>>> amount of data that is bounded by the maximum size of the
>> prefetch,
>>>> we
>>>>>>>>>> can
>>>>>>>>>> control what is the most amount of records returned. The client of
>>>> the
>>>>>>>>>> consumer API can then be responsible for keeping that data around
>>>> and
>>>>>>>> use
>>>>>>>>>> it when appropriate (i.e. when demand is present)
>>>>>>>>>>
>>>>>>>>>> 2. Introduce a facility to pass in a buffer into which the
>>>> prefetched
>>>>>>>>>> records are drained when poll is called and paused partitions have
>>>> some
>>>>>>>>>> prefetched records.
>>>>>>>>>>
>>>>>>>>>> Any opinions on the matter are welcome. Thanks a lot !
>>>>>>>>>>
>>>>>>>>>> Zahari Dichev
>>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>
>>>>
>>>
>>
>>
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to