[ 
https://issues.apache.org/jira/browse/KAFKA-12330?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Jacot reassigned KAFKA-12330:
-----------------------------------

    Assignee: David Jacot

> FetchSessionCache may cause starvation for partitions when FetchResponse is 
> full
> --------------------------------------------------------------------------------
>
>                 Key: KAFKA-12330
>                 URL: https://issues.apache.org/jira/browse/KAFKA-12330
>             Project: Kafka
>          Issue Type: Bug
>            Reporter: Lucas Bradstreet
>            Assignee: David Jacot
>            Priority: Major
>
> The incremental FetchSessionCache sessions deprioritizes partitions where a 
> response is returned. This may happen if log metadata such as log start 
> offset, hwm, etc is returned, or if data for that partition is returned.
> When a fetch response fills to maxBytes, data may not be returned for 
> partitions even if the fetch offset is lower than the fetch upper bound. 
> However, the fetch response will still contain updates to metadata such as 
> hwm if that metadata has changed. This can lead to degenerate behavior where 
> a partition's hwm or log start offset is updated resulting in the next fetch 
> being unnecessarily skipped for that partition. At first this appeared to be 
> worse, as hwm updates occur frequently, but starvation should result in hwm 
> movement becoming blocked, allowing a fetch to go through and then becoming 
> unstuck. However, it'll still require one more fetch request than necessary 
> to do so. Consumers may be affected more than replica fetchers, however they 
> often remove partitions with fetched data from the next fetch request and 
> this may be helping prevent starvation.
> I believe we should only reorder the partition fetch priority if data is 
> actually returned for a partition.
> {noformat}
> private class PartitionIterator(val iter: FetchSession.RESP_MAP_ITER,
>                                 val updateFetchContextAndRemoveUnselected: 
> Boolean)
>   extends FetchSession.RESP_MAP_ITER {
>   var nextElement: util.Map.Entry[TopicPartition, 
> FetchResponse.PartitionData[Records]] = null
>   override def hasNext: Boolean = {
>     while ((nextElement == null) && iter.hasNext) {
>       val element = iter.next()
>       val topicPart = element.getKey
>       val respData = element.getValue
>       val cachedPart = session.partitionMap.find(new 
> CachedPartition(topicPart))
>       val mustRespond = cachedPart.maybeUpdateResponseData(respData, 
> updateFetchContextAndRemoveUnselected)
>       if (mustRespond) {
>         nextElement = element
>         // Example POC change:
>         // Don't move partition to end of queue if we didn't actually fetch 
> data
>         // This should help avoid starvation even when we are filling the 
> fetch response fully while returning metadata for these partitions
>         if (updateFetchContextAndRemoveUnselected && respData.records != null 
> && respData.records.sizeInBytes > 0) {
>           session.partitionMap.remove(cachedPart)
>           session.partitionMap.mustAdd(cachedPart)
>         }
>       } else {
>         if (updateFetchContextAndRemoveUnselected) {
>           iter.remove()
>         }
>       }
>     }
>     nextElement != null
>   }{noformat}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to