kirktrue commented on code in PR #14406:
URL: https://github.com/apache/kafka/pull/14406#discussion_r1344897098


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchBuffer.java:
##########
@@ -37,7 +36,7 @@
  *
  * <em>Note</em>: this class is not thread-safe and is intended to only be 
used from a single thread.
  */
-public class FetchBuffer implements Closeable {
+public class FetchBuffer implements AutoCloseable {

Review Comment:
   Yes, the process is a bit convoluted...
   
   To perform the process of moving the fetched records from the background 
thread to the application thread and then on to the user, 
`PrototypeAsyncConsumer` has these three instance variables:
   
   1. `fetchResults`
   2. `fetchBuffer`
   3. `fetchCollector`
   
   All three of those objects are created in the application thread when the 
`PrototypeAsyncConsumer` is created. `fetchBuffer` and `fetchCollector` are 
only ever referenced by the application thread; `fetchResults`, however, is 
used by **both** threads.
   
   `fetchResults` is referenced in the background thread when it is used in the 
`FetchEvent` callback in the `sendFetches()` method:
   
   ```java
   private void sendFetches() {
       FetchEvent event = new FetchEvent();
       applicationEventHandler.add(event);
   
       event.future().whenComplete((completedFetches, error) -> {
           fetchResults.addAll(completedFetches);
       });
   }
   ```
   
   Since the `whenComplete()` method is executed when the background thread 
"completes" the `Future`, `fetchResults` is thus modified on the background 
thread.
   
   The rest of the process should occur on the application thread.
   
   During calls to `poll()` on the application thread, data from `fetchResults` 
is moved to `fetchBuffer` in `pollForFetches()`:
   
   ```java
   while (pollTimer.notExpired()) {
       CompletedFetch completedFetch = 
fetchResults.poll(pollTimer.remainingMs(), TimeUnit.MILLISECONDS);
   
       if (completedFetch != null)
           fetchBuffer.add(completedFetch);
   
       pollTimer.update();
   }
   ```
   
   The data in `fetchBuffer` is later extracted in `fetchCollector` during the 
`poll()` process, but this again is on the application thread.
   
   This roundabout way of getting the data is specifically done so that we 
don't write to the `FetchBuffer` inadvertently from the background thread. 
Hence these JavaDoc comment for `fetchResults`:
   
   ```java
   /**
    * A thread-safe {@link BlockingQueue queue} for the results that are 
populated in the background thread
    * when the fetch results are available. Because the {@link #fetchBuffer 
fetch buffer} is not thread-safe, we
    * need to separate the results collection that we provide to the background 
thread from the collection that
    * we read from on the application thread.
    */
   private final BlockingQueue<CompletedFetch> fetchResults = new 
LinkedBlockingQueue<>();
   ```
   
   This is a rough idea of what happens on the background thread:
   
   >`FetchRequestManager` -> `fetchResults`
   
   Then later in the application thread during `poll()`:
   
   > `fetchResults` -> `fetchBuffer` -> `fetchCollector`
   
   Let me know if that makes sense or if there is still a gap that I'm not 
seeing. I can write the above up (with any changes you'd like) in code comments.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to