On Tue, Oct 23, 2018, at 12:38, Zahari Dichev wrote:
> Hi there Matthias, I looked through the code of Kafka Streams. Quite
> impressive work ! If I have to put the logic of buffering within the
> context of what we are doing in Akka though, I might end up with the
> following situation.
> 
> 1. Poll is called with two partition being active *TP1, TP2*
> 2. We get some data for both, both of them also prefetch some data.
> 3. So now we have some data that we have obtained and some data that sits
> with the buffer of the fetcher, waiting to be obtained.
> 4. We put the data that we have obtained from the poll into the respective
> buffers of the partitions.
> 5. Since both of our buffers are "full", we call pause on both *TP1* and
> *TP2*.
> 6. A little time has passed and the client of *TP1* has processed all its
> records from the buffer, while the one of *TP2* has processed none
> 7. Buffer of *TP1* gets empty, we call resume on *TP1*
> 8. We call poll again with *TP1* resumed and *TP2* paused.
> 9. We get some records for TP1 and we throw away all the records that were
> prefetched for *TP2* in step 2
> 
> This can go on and on and due to the dynamic nature of the speed of
> processing records and the theoretically unlimited number of topic
> partitions, I find it possible that this scenario can happen more than once
> over the lifetime of a client. And instead of trying to calculate the
> probability of this happening and attempt to minimise it, I would prefer to
> have one of two options:
> 
> 1. Having control to allow me to enable the returning of already prefetched
> data, and simply store it in a buffer of my own until I have enough
> capacity to deal with it
> 
> OR
> 
> 2. Keep the data in the fetcher and not throw it away but use it on the
> next poll (not sure how viable that is as I have not looked at the details
> of it all).

I haven't thought about it that hard, but it sounds like the second option 
might be better.  I have a hard time thinking of a case where we actually want 
to throw away data for paused partitions.  If you're still subscribed to it, 
presumably you'll eventually unpause it and use the cache, right?  It makes 
sense for unsubscribe to clear those records, but not pause, as far as I can 
see.

best,
Colin


> 
> The first option is what I suggested initially and the second option is the
> one that will allow us to skip the introduction of a configuration
> parameter as Colin suggested. These are the things I can suggest at the
> moment. As mentioned, I am willing to carry out the work. There is also an
> official discussion thread, but I guess we have deviated from that, so I
> can just put that current on in JIRA instead if that is OK ?
> 
> Matthias, regarding how the fetcher works. From what I have looked at,
> whenever the consumer polls and returns some data, we immediately issue
> another fetch request that delivered us records that are returned on the
> next poll. All these fetched records, that have not made it to the caller
> of poll but have been fetched are thrown away in case at the time of the
> nest poll() the partition is in paused state. This is what is causing the
> inefficiency.
> 
> Any more comments are welcome.
> 
> On Mon, Oct 22, 2018 at 6:00 AM Ismael Juma <isma...@gmail.com> wrote:
> 
> > Hi,
> >
> > I think a KIP to discuss a concrete proposal makes sense. One suggestion is
> > to explore the possibility of fixing the issue without a new config. Would
> > that break existing users? Generally, we should strive for avoiding configs
> > if at all possible.
> >
> > Ismael
> >
> > On 16 Oct 2018 12:30 am, "Zahari Dichev" <zaharidic...@gmail.com> wrote:
> >
> > Hi there Kafka developers,
> >
> > I am currently trying to find a solution to an issue that has been
> > manifesting itself in the Akka streams implementation of the Kafka
> > connector. When it comes to consuming messages, the implementation relies
> > heavily on the fact that we can pause and resume partitions. In some
> > situations when a single consumer instance is shared among several streams,
> > we might end up with frequently pausing and unpausing a set of topic
> > partitions, which is the main facility that allows us to implement back
> > pressure. This however has certain disadvantages, especially when there are
> > two consumers that differ in terms of processing speed.
> >
> > To articulate the issue more clearly, imagine that a consumer maintains
> > assignments for two topic partitions *TP1* and *TP2*. This consumer is
> > shared by two streams - S1 and S2. So effectively when we have demand from
> > only one of the streams - *S1*, we will pause one of the topic partitions
> > *TP2* and call *poll()* on the consumer to only retrieve the records for
> > the demanded topic partition - *TP1*. The result of that is all the records
> > that have been prefetched for *TP2* are now thrown away by the fetcher
> > ("*Not
> > returning fetched records for assigned partition TP2 since it is no longer
> > fetchable"*). If we extrapolate that to multiple streams sharing the same
> > consumer, we might quickly end up in a situation where we throw prefetched
> > data quite often. This does not seem like the most efficient approach and
> > in fact produces quite a lot of overlapping fetch requests as illustrated
> > in the following issue:
> >
> > https://github.com/akka/alpakka-kafka/issues/549
> >
> > I am writing this email to get some initial opinion on a KIP I was thinking
> > about. What if we give the clients of the Consumer API a bit more control
> > of what to do with this prefetched data. Two options I am wondering about:
> >
> > 1. Introduce a configuration setting, such as*
> > "return-prefetched-data-for-paused-topic-partitions = false"* (have to
> > think of a better name), which when set to true will return what is
> > prefetched instead of throwing it away on calling *poll()*. Since this is
> > amount of data that is bounded by the maximum size of the prefetch, we can
> > control what is the most amount of records returned. The client of the
> > consumer API can then be responsible for keeping that data around and use
> > it when appropriate (i.e. when demand is present)
> >
> > 2. Introduce a facility to pass in a buffer into which the prefetched
> > records are drained when poll is called and paused partitions have some
> > prefetched records.
> >
> > Any opinions on the matter are welcome. Thanks a lot !
> >
> >
> > Zahari Dichev
> >

Reply via email to