[
https://issues.apache.org/jira/browse/KAFKA-15640?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Kirk True updated KAFKA-15640:
--
Description:
The interaction between {{{}FetchBuffer{}}}, {{{}FetchCollector{}}}, and
{{CompletedFetch}} is awkward, to say the least. Per [~junrao]'s comments
[here|https://github.com/apache/kafka/pull/14406#discussion_r1349361459] and
[here|https://github.com/apache/kafka/pull/14406#discussion_r1350773132], there
are three issues...
First:
{quote}{color:#172b4d}This is an existing issue. But the way we handle paused
partitions in {{collectFetch}} seems problematic. The application thread first
calls {{fetchBuffer.setNextInLineFetch(null)}} and then calls
{{{}fetchBuffer.addAll(pausedCompletedFetches){}}}. This could leave a brief
window where the paused partition is not included in either {{nextInLineFetch}}
or {{{}completedFetches{}}}. If the background thread kicks in in that window,
it could have fetched another chunk for that partition and added the response
back to FetchBuffer. This would violate the assumption there is no more than
one pending {{CompletedFetch}} per partition in FetchBuffer and could cause
records returned not in offset order or duplicates to be returned.{color}
{quote}
{color:#172b4d}Second:{color}
{quote}{color:#172b4d}The second existing issue is on the
{{fetchBuffer.setNextInLineFetch}} call in {{{}collectFetch{}}}. The issue is
that after all records are drained from {{{}nextInLineFetch{}}}. We only call
{{setNextInLineFetch}} when there is a new {{{}completedFetch{}}}. However,
until the drained {{completedFetch}} is removed from {{{}nextInLineFetch{}}},
the background thread can't fetch the next chunk. So, it seems that we will
just be stuck here.{color}
{quote}
{color:#172b4d}Third:{color}
{quote}{color:#172b4d}Currently, {{fetchBuffer.setNextInLineFetch}} and
{{fetchBuffer.poll}} are separate operations and we expect the caller to call
them in the right order to avoid a partition missing in FetchBuffer in the
transition phase. It still leaves us with the situation that a partition could
be in both completedFetches and nextInLineFetch at a particular time. It's not
a problem for now, but it may be in the future. Could we make them an atomic
operation? If not, could we add a comment to document the correct usage of the
api and the impact on partition being duplicated in completedFetches and
nextInLineFetch?{color}
{quote}
was:
The interaction between {{{}FetchBuffer{}}}, {{{}FetchCollector{}}}, and
{{CompletedFetch}} is awkward, to say the least.
Per [~junrao]'s comments:
{quote}Currently, {{fetchBuffer.setNextInLineFetch}} and {{fetchBuffer.poll}}
are separate operations and we expect the caller to call them in the right
order to avoid a partition missing in FetchBuffer in the transition phase. It
still leaves us with the situation that a partition could be in both
completedFetches and nextInLineFetch at a particular time. It's not a problem
for now, but it may be in the future. Could we make them an atomic operation?
If not, could we add a comment to document the correct usage of the api and the
impact on partition being duplicated in completedFetches and nextInLineFetch?
{quote}
> Refactor CompletedFetch initialization
> --
>
> Key: KAFKA-15640
> URL: https://issues.apache.org/jira/browse/KAFKA-15640
> Project: Kafka
> Issue Type: Improvement
> Components: clients, consumer
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Minor
>
> The interaction between {{{}FetchBuffer{}}}, {{{}FetchCollector{}}}, and
> {{CompletedFetch}} is awkward, to say the least. Per [~junrao]'s comments
> [here|https://github.com/apache/kafka/pull/14406#discussion_r1349361459] and
> [here|https://github.com/apache/kafka/pull/14406#discussion_r1350773132],
> there are three issues...
>
> First:
> {quote}{color:#172b4d}This is an existing issue. But the way we handle paused
> partitions in {{collectFetch}} seems problematic. The application thread
> first calls {{fetchBuffer.setNextInLineFetch(null)}} and then calls
> {{{}fetchBuffer.addAll(pausedCompletedFetches){}}}. This could leave a brief
> window where the paused partition is not included in either
> {{nextInLineFetch}} or {{{}completedFetches{}}}. If the background thread
> kicks in in that window, it could have fetched another chunk for that
> partition and added the response back to FetchBuffer. This would violate the
> assumption there is no more than one pending {{CompletedFetch}} per partition
> in FetchBuffer and could cause records returned not in offset order or
> duplicates to be returned.{color}
> {quote}
>
> {color:#172b4d}Second:{color}
> {quote}{color:#172b4d}The second existing issue is on the
> {{fetchBuffer.setNextInLineFetch}}