Just my 2 cents.

I am not 100% sure if we would need to change the consumer for this.
While I still think, that KIP-349 might be valuable, it seems to be
complementary/orthogonal to the issue discussed here.

For Kafka Streams, we have a related scenario and what Kafka Streams
does is, to add its own buffer on top of the consumer. Thus, for each
`poll()` all data is put into this buffer, and now Streams can decide
which record to process first. For buffers that have data, we can call
`pause()` without loosing fetched data (not 100% percent guarantee,
depending on fetch size and max.poll.record etc, but with high
probability) and if a buffer gets empty we `resume()` partitions.

As Akka Streams builds on top of the consumer it could implement a
similar pattern. Of course, one cannot use `auto.commit` on the
consumer, but commits need to be manged manually, (ie, only data that
was take out of the buffer and actually was processed can be committed).

For the `MessageChooser` idea, I also still think it might be useful,
but it's unclear to me if this should be a consumer feature or build on
top of the consumer (maybe it could be a Streams feature, as Streams is
build on top of the consumer). Thoughts?


-Matthias

On 10/18/18 9:27 AM, Jan Filipiak wrote:
> The idea for you would be that Messagechooser could hang on to the 
> prefetched messages.
> 
> ccing cmcc...@apache.org
> 
> @Collin
> just for you to see that MessageChooser is a powerfull abstraction.
> 
> :)
> 
> Best jan
> 
> On 18.10.2018 13:59, Zahari Dichev wrote:
>> Jan,
>>
>> Quite insightful indeed. I think your propositions are valid.
>>
>> Ryanne,
>>
>> I understand that consumers are using a pull model... And yes, indeed if a
>> consumer is not ready for more records it surely should not call poll.
>> Except that it needs to do so periodically in order to indicate that its
>> live. Forget about the "backpressure", I guess I was wrong with phrasing
>> this so lets not get caught up on it.
>>
>> You say pause/resume can be used to prioritise certain topics/partitions
>> over others. And indeed this is the case. So instead of thinking about it
>> in terms of backpressure, lets put it in a different way. The Akka streams
>> connector would like to prioritise certain topics over others, using once
>> consumer instance. On top of that, add the detail that the priorities
>> change quite frequently (which translates to calling pause/resume
>> frequently). So all that being said, what would be a proper way to handle
>> the situation without throwing the pre-fetched records away when calling
>> poll on a consumer that happens to have a topic that was recently paused
>> (and that might be un-paused soon )? Am I the only one who considers that
>> an actual problem with the use os pause/resume ? Not sure how to explain
>> the situation in a better way..
>>
>> Zahari
>>
>>
>> On Thu, Oct 18, 2018 at 9:46 AM Zahari Dichev <zaharidic...@gmail.com>
>> wrote:
>>
>>> Thanks a lot Jan,
>>>
>>> I will read it.
>>>
>>> Zahari
>>>
>>> On Thu, Oct 18, 2018 at 9:31 AM Jan Filipiak <jan.filip...@trivago.com>
>>> wrote:
>>>
>>>> especially my suggestions ;)
>>>>
>>>> On 18.10.2018 08:30, Jan Filipiak wrote:
>>>>> Hi Zahari,
>>>>>
>>>>> would you be willing to scan through the KIP-349 discussion a little?
>>>>> I think it has suggestions that could be interesting for you
>>>>>
>>>>> Best Jan
>>>>>
>>>>> On 16.10.2018 09:29, Zahari Dichev wrote:
>>>>>> Hi there Kafka developers,
>>>>>>
>>>>>> I am currently trying to find a solution to an issue that has been
>>>>>> manifesting itself in the Akka streams implementation of the Kafka
>>>>>> connector. When it comes to consuming messages, the implementation
>>>> relies
>>>>>> heavily on the fact that we can pause and resume partitions. In some
>>>>>> situations when a single consumer instance is shared among several
>>>>>> streams,
>>>>>> we might end up with frequently pausing and unpausing a set of topic
>>>>>> partitions, which is the main facility that allows us to implement back
>>>>>> pressure. This however has certain disadvantages, especially when
>>>>>> there are
>>>>>> two consumers that differ in terms of processing speed.
>>>>>>
>>>>>> To articulate the issue more clearly, imagine that a consumer maintains
>>>>>> assignments for two topic partitions *TP1* and *TP2*. This consumer is
>>>>>> shared by two streams - S1 and S2. So effectively when we have demand
>>>>>> from
>>>>>> only one of the streams - *S1*, we will pause one of the topic
>>>> partitions
>>>>>> *TP2* and call *poll()* on the consumer to only retrieve the records
>>>> for
>>>>>> the demanded topic partition - *TP1*. The result of that is all the
>>>>>> records
>>>>>> that have been prefetched for *TP2* are now thrown away by the fetcher
>>>>>> ("*Not
>>>>>> returning fetched records for assigned partition TP2 since it is no
>>>>>> longer
>>>>>> fetchable"*). If we extrapolate that to multiple streams sharing the
>>>> same
>>>>>> consumer, we might quickly end up in a situation where we throw
>>>>>> prefetched
>>>>>> data quite often. This does not seem like the most efficient approach
>>>> and
>>>>>> in fact produces quite a lot of overlapping fetch requests as
>>>> illustrated
>>>>>> in the following issue:
>>>>>>
>>>>>> https://github.com/akka/alpakka-kafka/issues/549
>>>>>>
>>>>>> I am writing this email to get some initial opinion on a KIP I was
>>>>>> thinking
>>>>>> about. What if we give the clients of the Consumer API a bit more
>>>> control
>>>>>> of what to do with this prefetched data. Two options I am wondering
>>>>>> about:
>>>>>>
>>>>>> 1. Introduce a configuration setting, such as*
>>>>>> "return-prefetched-data-for-paused-topic-partitions = false"* (have to
>>>>>> think of a better name), which when set to true will return what is
>>>>>> prefetched instead of throwing it away on calling *poll()*. Since this
>>>> is
>>>>>> amount of data that is bounded by the maximum size of the prefetch, we
>>>>>> can
>>>>>> control what is the most amount of records returned. The client of the
>>>>>> consumer API can then be responsible for keeping that data around and
>>>> use
>>>>>> it when appropriate (i.e. when demand is present)
>>>>>>
>>>>>> 2. Introduce a facility to pass in a buffer into which the prefetched
>>>>>> records are drained when poll is called and paused partitions have some
>>>>>> prefetched records.
>>>>>>
>>>>>> Any opinions on the matter are welcome. Thanks a lot !
>>>>>>
>>>>>> Zahari Dichev
>>>>>>
>>>>
>>>
>>

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to