Hi there Kafka developers,

I am currently trying to find a solution to an issue that has been
manifesting itself in the Akka streams implementation of the Kafka
connector. When it comes to consuming messages, the implementation relies
heavily on the fact that we can pause and resume partitions. In some
situations when a single consumer instance is shared among several streams,
we might end up with frequently pausing and unpausing a set of topic
partitions, which is the main facility that allows us to implement back
pressure. This however has certain disadvantages, especially when there are
two consumers that differ in terms of processing speed.

To articulate the issue more clearly, imagine that a consumer maintains
assignments for two topic partitions *TP1* and *TP2*. This consumer is
shared by two streams - S1 and S2. So effectively when we have demand from
only one of the streams - *S1*, we will pause one of the topic partitions
*TP2* and call *poll()* on the consumer to only retrieve the records for
the demanded topic partition - *TP1*. The result of that is all the records
that have been prefetched for *TP2* are now thrown away by the fetcher ("*Not
returning fetched records for assigned partition TP2 since it is no longer
fetchable"*). If we extrapolate that to multiple streams sharing the same
consumer, we might quickly end up in a situation where we throw prefetched
data quite often. This does not seem like the most efficient approach and
in fact produces quite a lot of overlapping fetch requests as illustrated
in the following issue:

https://github.com/akka/alpakka-kafka/issues/549

I am writing this email to get some initial opinion on a KIP I was thinking
about. What if we give the clients of the Consumer API a bit more control
of what to do with this prefetched data. Two options I am wondering about:

1. Introduce a configuration setting, such as*
"return-prefetched-data-for-paused-topic-partitions = false"* (have to
think of a better name), which when set to true will return what is
prefetched instead of throwing it away on calling *poll()*. Since this is
amount of data that is bounded by the maximum size of the prefetch, we can
control what is the most amount of records returned. The client of the
consumer API can then be responsible for keeping that data around and use
it when appropriate (i.e. when demand is present)

2. Introduce a facility to pass in a buffer into which the prefetched
records are drained when poll is called and paused partitions have some
prefetched records.

Any opinions on the matter are welcome. Thanks a lot !

Zahari Dichev

Reply via email to