[ https://issues.apache.org/jira/browse/KAFKA-12330?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
David Jacot reassigned KAFKA-12330: ----------------------------------- Assignee: David Jacot > FetchSessionCache may cause starvation for partitions when FetchResponse is > full > -------------------------------------------------------------------------------- > > Key: KAFKA-12330 > URL: https://issues.apache.org/jira/browse/KAFKA-12330 > Project: Kafka > Issue Type: Bug > Reporter: Lucas Bradstreet > Assignee: David Jacot > Priority: Major > > The incremental FetchSessionCache sessions deprioritizes partitions where a > response is returned. This may happen if log metadata such as log start > offset, hwm, etc is returned, or if data for that partition is returned. > When a fetch response fills to maxBytes, data may not be returned for > partitions even if the fetch offset is lower than the fetch upper bound. > However, the fetch response will still contain updates to metadata such as > hwm if that metadata has changed. This can lead to degenerate behavior where > a partition's hwm or log start offset is updated resulting in the next fetch > being unnecessarily skipped for that partition. At first this appeared to be > worse, as hwm updates occur frequently, but starvation should result in hwm > movement becoming blocked, allowing a fetch to go through and then becoming > unstuck. However, it'll still require one more fetch request than necessary > to do so. Consumers may be affected more than replica fetchers, however they > often remove partitions with fetched data from the next fetch request and > this may be helping prevent starvation. > I believe we should only reorder the partition fetch priority if data is > actually returned for a partition. > {noformat} > private class PartitionIterator(val iter: FetchSession.RESP_MAP_ITER, > val updateFetchContextAndRemoveUnselected: > Boolean) > extends FetchSession.RESP_MAP_ITER { > var nextElement: util.Map.Entry[TopicPartition, > FetchResponse.PartitionData[Records]] = null > override def hasNext: Boolean = { > while ((nextElement == null) && iter.hasNext) { > val element = iter.next() > val topicPart = element.getKey > val respData = element.getValue > val cachedPart = session.partitionMap.find(new > CachedPartition(topicPart)) > val mustRespond = cachedPart.maybeUpdateResponseData(respData, > updateFetchContextAndRemoveUnselected) > if (mustRespond) { > nextElement = element > // Example POC change: > // Don't move partition to end of queue if we didn't actually fetch > data > // This should help avoid starvation even when we are filling the > fetch response fully while returning metadata for these partitions > if (updateFetchContextAndRemoveUnselected && respData.records != null > && respData.records.sizeInBytes > 0) { > session.partitionMap.remove(cachedPart) > session.partitionMap.mustAdd(cachedPart) > } > } else { > if (updateFetchContextAndRemoveUnselected) { > iter.remove() > } > } > } > nextElement != null > }{noformat} > -- This message was sent by Atlassian Jira (v8.3.4#803005)