[ 
https://issues.apache.org/jira/browse/KAFKA-12330?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lucas Bradstreet updated KAFKA-12330:
-------------------------------------
    Description: 
The incremental FetchSessionCache sessions deprioritizes partitions where a 
response is returned. This may happen if log metadata such as log start offset, 
hwm, etc is returned, or if data for that partition is returned.

When a fetch response fills to maxBytes, data may not be returned for 
partitions where it's available. However, the fetch response will still contain 
updates to metadata such as hwm if that metadata has changed. This can lead to 
degenerate behavior where a partition's hwm or log start offset is updated 
resulting in the next fetch being unnecessarily skipped for that partition. At 
first this appeared to be worse, as hwm updates occur frequently, but 
starvation should result in hwm movement becoming blocked, allowing a fetch to 
go through and then becoming unstuck. However, it'll still require one more 
fetch request than necessary to do so. Consumers may be affected more than 
replica fetchers, however they often remove partitions with fetched data from 
the next fetch request and this may be helping prevent starvation.

I believe we should only reorder the partition fetch priority if data is 
actually returned for a partition.
{noformat}
private class PartitionIterator(val iter: FetchSession.RESP_MAP_ITER,
                                val updateFetchContextAndRemoveUnselected: 
Boolean)
  extends FetchSession.RESP_MAP_ITER {
  var nextElement: util.Map.Entry[TopicPartition, 
FetchResponse.PartitionData[Records]] = null

  override def hasNext: Boolean = {
    while ((nextElement == null) && iter.hasNext) {
      val element = iter.next()
      val topicPart = element.getKey
      val respData = element.getValue
      val cachedPart = session.partitionMap.find(new CachedPartition(topicPart))
      val mustRespond = cachedPart.maybeUpdateResponseData(respData, 
updateFetchContextAndRemoveUnselected)
      if (mustRespond) {
        nextElement = element
        // Don't move partition to end of queue if we didn't actually fetch data
        // This should help avoid starvation even when we are filling the fetch 
response fully while returning metadata for these partitions
        if (updateFetchContextAndRemoveUnselected && respData.records != null 
&& respData.records.sizeInBytes > 0) {
          session.partitionMap.remove(cachedPart)
          session.partitionMap.mustAdd(cachedPart)
        }
      } else {
        if (updateFetchContextAndRemoveUnselected) {
          iter.remove()
        }
      }
    }
    nextElement != null
  }{noformat}
 

  was:
The incremental FetchSessionCache sessions deprioritizes partitions where a 
response is returned. This may happen if log metadata such as log start offset, 
hwm, etc is returned, or if data for that partition is returned.

When a fetch response fills to maxBytes, data may not be returned for 
partitions where it's available. However, the fetch response will still contain 
updates to metadata such as hwm if that metadata has changed. This can lead to 
degenerate behavior where a partition's hwm or log start offset is updated 
resulting in the next fetch being unnecessarily skipped for that partition. At 
first this appeared to be worse, as hwm updates occur frequently, but 
starvation should result in hwm movement becoming blocked, allowing a fetch to 
go through and then becoming unstuck. However, it'll still require one more 
fetch request than necessary to do so.

I believe we should only reorder the partition fetch priority if data is 
actually returned for a partition.

 
{code:java}
 {code}
{noformat}
private class PartitionIterator(val iter: FetchSession.RESP_MAP_ITER,
                                val updateFetchContextAndRemoveUnselected: 
Boolean)
  extends FetchSession.RESP_MAP_ITER {
  var nextElement: util.Map.Entry[TopicPartition, 
FetchResponse.PartitionData[Records]] = null

  override def hasNext: Boolean = {
    while ((nextElement == null) && iter.hasNext) {
      val element = iter.next()
      val topicPart = element.getKey
      val respData = element.getValue
      val cachedPart = session.partitionMap.find(new CachedPartition(topicPart))
      val mustRespond = cachedPart.maybeUpdateResponseData(respData, 
updateFetchContextAndRemoveUnselected)
      if (mustRespond) {
        nextElement = element
        // Don't move partition to end of queue if we didn't actually fetch data
        // This should help avoid starvation even when we are filling the fetch 
response fully while returning metadata for these partitions
        if (updateFetchContextAndRemoveUnselected && respData.records != null 
&& respData.records.sizeInBytes > 0) {
          session.partitionMap.remove(cachedPart)
          session.partitionMap.mustAdd(cachedPart)
        }
      } else {
        if (updateFetchContextAndRemoveUnselected) {
          iter.remove()
        }
      }
    }
    nextElement != null
  }{noformat}
 


> FetchSessionCache may cause starvation for partitions when FetchResponse is 
> full
> --------------------------------------------------------------------------------
>
>                 Key: KAFKA-12330
>                 URL: https://issues.apache.org/jira/browse/KAFKA-12330
>             Project: Kafka
>          Issue Type: Bug
>            Reporter: Lucas Bradstreet
>            Priority: Major
>
> The incremental FetchSessionCache sessions deprioritizes partitions where a 
> response is returned. This may happen if log metadata such as log start 
> offset, hwm, etc is returned, or if data for that partition is returned.
> When a fetch response fills to maxBytes, data may not be returned for 
> partitions where it's available. However, the fetch response will still 
> contain updates to metadata such as hwm if that metadata has changed. This 
> can lead to degenerate behavior where a partition's hwm or log start offset 
> is updated resulting in the next fetch being unnecessarily skipped for that 
> partition. At first this appeared to be worse, as hwm updates occur 
> frequently, but starvation should result in hwm movement becoming blocked, 
> allowing a fetch to go through and then becoming unstuck. However, it'll 
> still require one more fetch request than necessary to do so. Consumers may 
> be affected more than replica fetchers, however they often remove partitions 
> with fetched data from the next fetch request and this may be helping prevent 
> starvation.
> I believe we should only reorder the partition fetch priority if data is 
> actually returned for a partition.
> {noformat}
> private class PartitionIterator(val iter: FetchSession.RESP_MAP_ITER,
>                                 val updateFetchContextAndRemoveUnselected: 
> Boolean)
>   extends FetchSession.RESP_MAP_ITER {
>   var nextElement: util.Map.Entry[TopicPartition, 
> FetchResponse.PartitionData[Records]] = null
>   override def hasNext: Boolean = {
>     while ((nextElement == null) && iter.hasNext) {
>       val element = iter.next()
>       val topicPart = element.getKey
>       val respData = element.getValue
>       val cachedPart = session.partitionMap.find(new 
> CachedPartition(topicPart))
>       val mustRespond = cachedPart.maybeUpdateResponseData(respData, 
> updateFetchContextAndRemoveUnselected)
>       if (mustRespond) {
>         nextElement = element
>         // Don't move partition to end of queue if we didn't actually fetch 
> data
>         // This should help avoid starvation even when we are filling the 
> fetch response fully while returning metadata for these partitions
>         if (updateFetchContextAndRemoveUnselected && respData.records != null 
> && respData.records.sizeInBytes > 0) {
>           session.partitionMap.remove(cachedPart)
>           session.partitionMap.mustAdd(cachedPart)
>         }
>       } else {
>         if (updateFetchContextAndRemoveUnselected) {
>           iter.remove()
>         }
>       }
>     }
>     nextElement != null
>   }{noformat}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to