kirktrue commented on code in PR #14406: URL: https://github.com/apache/kafka/pull/14406#discussion_r1344897098
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchBuffer.java: ########## @@ -37,7 +36,7 @@ * * <em>Note</em>: this class is not thread-safe and is intended to only be used from a single thread. */ -public class FetchBuffer implements Closeable { +public class FetchBuffer implements AutoCloseable { Review Comment: Yes, the process is a bit convoluted... To perform the process of moving the fetched records from the background thread to the application thread and then on to the user, `PrototypeAsyncConsumer` has these three instance variables: 1. `fetchResults` 2. `fetchBuffer` 3. `fetchCollector` All three of those objects are created in the application thread when the `PrototypeAsyncConsumer` is created. `fetchBuffer` and `fetchCollector` are only ever referenced by the application thread; `fetchResults`, however, is used by **both** threads. `fetchResults` is referenced in the background thread when it is used in the `FetchEvent` callback in the `sendFetches()` method: ```java private void sendFetches() { FetchEvent event = new FetchEvent(); applicationEventHandler.add(event); event.future().whenComplete((completedFetches, error) -> { fetchResults.addAll(completedFetches); }); } ``` Since the `whenComplete()` method is executed when the background thread "completes" the `Future`, `fetchResults` is thus modified on the background thread. The rest of the process should occur on the application thread. During calls to `poll()` on the application thread, data from `fetchResults` is moved to `fetchBuffer` in `pollForFetches()`: ```java while (pollTimer.notExpired()) { CompletedFetch completedFetch = fetchResults.poll(pollTimer.remainingMs(), TimeUnit.MILLISECONDS); if (completedFetch != null) fetchBuffer.add(completedFetch); pollTimer.update(); } ``` The data in `fetchBuffer` is later extracted in `fetchCollector` during the `poll()` process, but this again is on the application thread. This roundabout way of getting the data is specifically done so that we don't write to the `FetchBuffer` inadvertently from the background thread. Hence these JavaDoc comment for `fetchResults`: ```java /** * A thread-safe {@link BlockingQueue queue} for the results that are populated in the background thread * when the fetch results are available. Because the {@link #fetchBuffer fetch buffer} is not thread-safe, we * need to separate the results collection that we provide to the background thread from the collection that * we read from on the application thread. */ private final BlockingQueue<CompletedFetch> fetchResults = new LinkedBlockingQueue<>(); ``` This is a rough idea of what happens on the background thread: >`FetchRequestManager` -> `fetchResults` Then later in the application thread during `poll()`: > `fetchResults` -> `fetchBuffer` -> `fetchCollector` Let me know if that makes sense or if there is still a gap that I'm not seeing. I can write the above up (with any changes you'd like) in code comments. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org