Kirk True created KAFKA-15641: --------------------------------- Summary: Investigate CompletedFetch handleInitializeErrors for accuracy Key: KAFKA-15641 URL: https://issues.apache.org/jira/browse/KAFKA-15641 Project: Kafka Issue Type: Improvement Components: clients, consumer Reporter: Kirk True Assignee: Kirk True
The interaction between {{{}FetchBuffer{}}}, {{{}FetchCollector{}}}, and {{CompletedFetch}} is awkward, to say the least. Per [~junrao]'s comments [here|https://github.com/apache/kafka/pull/14406#discussion_r1349361459] and [here|https://github.com/apache/kafka/pull/14406#discussion_r1350773132], there are three issues... First: {quote}{color:#172b4d}This is an existing issue. But the way we handle paused partitions in {{collectFetch}} seems problematic. The application thread first calls {{fetchBuffer.setNextInLineFetch(null)}} and then calls {{{}fetchBuffer.addAll(pausedCompletedFetches){}}}. This could leave a brief window where the paused partition is not included in either {{nextInLineFetch}} or {{{}completedFetches{}}}. If the background thread kicks in in that window, it could have fetched another chunk for that partition and added the response back to FetchBuffer. This would violate the assumption there is no more than one pending {{CompletedFetch}} per partition in FetchBuffer and could cause records returned not in offset order or duplicates to be returned.{color} {quote} {color:#172b4d}Second:{color} {quote}{color:#172b4d}The second existing issue is on the {{fetchBuffer.setNextInLineFetch}} call in {{{}collectFetch{}}}. The issue is that after all records are drained from {{{}nextInLineFetch{}}}. We only call {{setNextInLineFetch}} when there is a new {{{}completedFetch{}}}. However, until the drained {{completedFetch}} is removed from {{{}nextInLineFetch{}}}, the background thread can't fetch the next chunk. So, it seems that we will just be stuck here.{color} {quote} {color:#172b4d}Third:{color} {quote}{color:#172b4d}Currently, {{fetchBuffer.setNextInLineFetch}} and {{fetchBuffer.poll}} are separate operations and we expect the caller to call them in the right order to avoid a partition missing in FetchBuffer in the transition phase. It still leaves us with the situation that a partition could be in both completedFetches and nextInLineFetch at a particular time. It's not a problem for now, but it may be in the future. Could we make them an atomic operation? If not, could we add a comment to document the correct usage of the api and the impact on partition being duplicated in completedFetches and nextInLineFetch?{color} {quote} -- This message was sent by Atlassian Jira (v8.20.10#820010)