Hi there Matthias,

Very useful thoughts indeed. I have considered the exact same approach but
what worries me a bit is that I do not think that will certainly solve the
issue. Imagine the following situation. You have one consumer that is quite
slow so lets say it call poll every 5 seconds, while you need to call poll
every 1 second to issue a heartbeat (these are made up numbers of course).

So our consumer calls poll at T0 grabs some data and puts it in a buffer
and calls pause on the topic partition. The fetcher tries to pipeline and
issues a fetch request and at some point the data arrives. At that point we
have some data in the buffer and we can do whatever we want with it, but
there is also some data living within the consumer/fetcher. Approximately
one second later we call poll again because we need to. We are not getting
any data because our partition is paused and this is good because if we got
data we would not know what to do with it as our client is still busy
crunching the data from the first poll. So far so good. What happens though
is that the pre-fetched data gets thrown away upon calling poll as its "no
longer fetchable...".  Maybe I am not fully understanding your suggested
approach, but I dont think it would solve this problem.

Zahari

On Fri, Oct 19, 2018 at 8:10 PM Matthias J. Sax <matth...@confluent.io>
wrote:

> Just my 2 cents.
>
> I am not 100% sure if we would need to change the consumer for this.
> While I still think, that KIP-349 might be valuable, it seems to be
> complementary/orthogonal to the issue discussed here.
>
> For Kafka Streams, we have a related scenario and what Kafka Streams
> does is, to add its own buffer on top of the consumer. Thus, for each
> `poll()` all data is put into this buffer, and now Streams can decide
> which record to process first. For buffers that have data, we can call
> `pause()` without loosing fetched data (not 100% percent guarantee,
> depending on fetch size and max.poll.record etc, but with high
> probability) and if a buffer gets empty we `resume()` partitions.
>
> As Akka Streams builds on top of the consumer it could implement a
> similar pattern. Of course, one cannot use `auto.commit` on the
> consumer, but commits need to be manged manually, (ie, only data that
> was take out of the buffer and actually was processed can be committed).
>
> For the `MessageChooser` idea, I also still think it might be useful,
> but it's unclear to me if this should be a consumer feature or build on
> top of the consumer (maybe it could be a Streams feature, as Streams is
> build on top of the consumer). Thoughts?
>
>
> -Matthias
>
> On 10/18/18 9:27 AM, Jan Filipiak wrote:
> > The idea for you would be that Messagechooser could hang on to the
> > prefetched messages.
> >
> > ccing cmcc...@apache.org
> >
> > @Collin
> > just for you to see that MessageChooser is a powerfull abstraction.
> >
> > :)
> >
> > Best jan
> >
> > On 18.10.2018 13:59, Zahari Dichev wrote:
> >> Jan,
> >>
> >> Quite insightful indeed. I think your propositions are valid.
> >>
> >> Ryanne,
> >>
> >> I understand that consumers are using a pull model... And yes, indeed
> if a
> >> consumer is not ready for more records it surely should not call poll.
> >> Except that it needs to do so periodically in order to indicate that its
> >> live. Forget about the "backpressure", I guess I was wrong with phrasing
> >> this so lets not get caught up on it.
> >>
> >> You say pause/resume can be used to prioritise certain topics/partitions
> >> over others. And indeed this is the case. So instead of thinking about
> it
> >> in terms of backpressure, lets put it in a different way. The Akka
> streams
> >> connector would like to prioritise certain topics over others, using
> once
> >> consumer instance. On top of that, add the detail that the priorities
> >> change quite frequently (which translates to calling pause/resume
> >> frequently). So all that being said, what would be a proper way to
> handle
> >> the situation without throwing the pre-fetched records away when calling
> >> poll on a consumer that happens to have a topic that was recently paused
> >> (and that might be un-paused soon )? Am I the only one who considers
> that
> >> an actual problem with the use os pause/resume ? Not sure how to explain
> >> the situation in a better way..
> >>
> >> Zahari
> >>
> >>
> >> On Thu, Oct 18, 2018 at 9:46 AM Zahari Dichev <zaharidic...@gmail.com>
> >> wrote:
> >>
> >>> Thanks a lot Jan,
> >>>
> >>> I will read it.
> >>>
> >>> Zahari
> >>>
> >>> On Thu, Oct 18, 2018 at 9:31 AM Jan Filipiak <jan.filip...@trivago.com
> >
> >>> wrote:
> >>>
> >>>> especially my suggestions ;)
> >>>>
> >>>> On 18.10.2018 08:30, Jan Filipiak wrote:
> >>>>> Hi Zahari,
> >>>>>
> >>>>> would you be willing to scan through the KIP-349 discussion a little?
> >>>>> I think it has suggestions that could be interesting for you
> >>>>>
> >>>>> Best Jan
> >>>>>
> >>>>> On 16.10.2018 09:29, Zahari Dichev wrote:
> >>>>>> Hi there Kafka developers,
> >>>>>>
> >>>>>> I am currently trying to find a solution to an issue that has been
> >>>>>> manifesting itself in the Akka streams implementation of the Kafka
> >>>>>> connector. When it comes to consuming messages, the implementation
> >>>> relies
> >>>>>> heavily on the fact that we can pause and resume partitions. In some
> >>>>>> situations when a single consumer instance is shared among several
> >>>>>> streams,
> >>>>>> we might end up with frequently pausing and unpausing a set of topic
> >>>>>> partitions, which is the main facility that allows us to implement
> back
> >>>>>> pressure. This however has certain disadvantages, especially when
> >>>>>> there are
> >>>>>> two consumers that differ in terms of processing speed.
> >>>>>>
> >>>>>> To articulate the issue more clearly, imagine that a consumer
> maintains
> >>>>>> assignments for two topic partitions *TP1* and *TP2*. This consumer
> is
> >>>>>> shared by two streams - S1 and S2. So effectively when we have
> demand
> >>>>>> from
> >>>>>> only one of the streams - *S1*, we will pause one of the topic
> >>>> partitions
> >>>>>> *TP2* and call *poll()* on the consumer to only retrieve the records
> >>>> for
> >>>>>> the demanded topic partition - *TP1*. The result of that is all the
> >>>>>> records
> >>>>>> that have been prefetched for *TP2* are now thrown away by the
> fetcher
> >>>>>> ("*Not
> >>>>>> returning fetched records for assigned partition TP2 since it is no
> >>>>>> longer
> >>>>>> fetchable"*). If we extrapolate that to multiple streams sharing the
> >>>> same
> >>>>>> consumer, we might quickly end up in a situation where we throw
> >>>>>> prefetched
> >>>>>> data quite often. This does not seem like the most efficient
> approach
> >>>> and
> >>>>>> in fact produces quite a lot of overlapping fetch requests as
> >>>> illustrated
> >>>>>> in the following issue:
> >>>>>>
> >>>>>> https://github.com/akka/alpakka-kafka/issues/549
> >>>>>>
> >>>>>> I am writing this email to get some initial opinion on a KIP I was
> >>>>>> thinking
> >>>>>> about. What if we give the clients of the Consumer API a bit more
> >>>> control
> >>>>>> of what to do with this prefetched data. Two options I am wondering
> >>>>>> about:
> >>>>>>
> >>>>>> 1. Introduce a configuration setting, such as*
> >>>>>> "return-prefetched-data-for-paused-topic-partitions = false"* (have
> to
> >>>>>> think of a better name), which when set to true will return what is
> >>>>>> prefetched instead of throwing it away on calling *poll()*. Since
> this
> >>>> is
> >>>>>> amount of data that is bounded by the maximum size of the prefetch,
> we
> >>>>>> can
> >>>>>> control what is the most amount of records returned. The client of
> the
> >>>>>> consumer API can then be responsible for keeping that data around
> and
> >>>> use
> >>>>>> it when appropriate (i.e. when demand is present)
> >>>>>>
> >>>>>> 2. Introduce a facility to pass in a buffer into which the
> prefetched
> >>>>>> records are drained when poll is called and paused partitions have
> some
> >>>>>> prefetched records.
> >>>>>>
> >>>>>> Any opinions on the matter are welcome. Thanks a lot !
> >>>>>>
> >>>>>> Zahari Dichev
> >>>>>>
> >>>>
> >>>
> >>
>
>

Reply via email to