kirktrue commented on code in PR #14359:
URL: https://github.com/apache/kafka/pull/14359#discussion_r1325174579


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java:
##########
@@ -91,23 +105,70 @@ public void onFailure(RuntimeException e) {
                 }
             };
 
-            final RequestFuture<ClientResponse> future = 
client.send(fetchTarget, request);
+            final RequestFuture<ClientResponse> future = 
nodeStatusDetector.send(fetchTarget, request);
             future.addListener(listener);
         }
 
         return fetchRequestMap.size();
     }
 
-    public void close(final Timer timer) {
-        if (!isClosed.compareAndSet(false, true)) {
-            log.info("Fetcher {} is already closed.", this);
-            return;
+    public Fetch<K, V> collectFetch() {
+        return fetchCollector.collectFetch(fetchBuffer);
+    }
+
+    protected void maybeCloseFetchSessions(final Timer timer) {

Review Comment:
   Yes, it is a bit obtuse. My apologies.
   
   There are two reasons that `maybeCloseFetchSessions` exists in `Fetcher`:
   
   1. The forthcoming `FetchRequestManager` doesn't close sessions in the same 
way, so it doesn't make sense to have the method in the shared `AbstractFetch` 
class.
   2. `FetchRequestManager` (as with the other `RequestManager`s) doesn't use 
the `ConsumerNetworkClient` but uses the `NetworkClientDelegate`. The code in 
`AbstractFetch` that is shared by both `Fetcher` and—soon—`FetchRequestManager` 
needed a way to call some of the methods that `ConsumerNetworkClient` and 
`NetworkClientDelegate` have in common. Thus the `NodeStatusDetector` interface 
was created.
   
   Because the `FetchRequestManager` isn't in this PR, it's understandably 
unclear the reason for the current structure. For now I've made the following 
changes:
   
   1. I moved `maybeCloseFetchSessions` to `AbstractFetch`
   2. I left `AbstractFetch` with a reference to a `ConsumerNetworkClient`
   3. I've removed the `NodeStatusDetector`
   
   As needed, the above may be changed in forthcoming PRs.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java:
##########
@@ -91,23 +105,70 @@ public void onFailure(RuntimeException e) {
                 }
             };
 
-            final RequestFuture<ClientResponse> future = 
client.send(fetchTarget, request);
+            final RequestFuture<ClientResponse> future = 
nodeStatusDetector.send(fetchTarget, request);
             future.addListener(listener);
         }
 
         return fetchRequestMap.size();
     }
 
-    public void close(final Timer timer) {
-        if (!isClosed.compareAndSet(false, true)) {
-            log.info("Fetcher {} is already closed.", this);
-            return;
+    public Fetch<K, V> collectFetch() {
+        return fetchCollector.collectFetch(fetchBuffer);
+    }
+
+    protected void maybeCloseFetchSessions(final Timer timer) {

Review Comment:
   Yes, it is a bit obtuse. My apologies.
   
   There are two reasons that `maybeCloseFetchSessions` exists in `Fetcher`:
   
   1. The forthcoming `FetchRequestManager` doesn't close sessions in the same 
way, so it doesn't make sense to have the method in the shared `AbstractFetch` 
class.
   2. `FetchRequestManager` (as with the other `RequestManager`s) doesn't use 
the `ConsumerNetworkClient` but uses the `NetworkClientDelegate`. The code in 
`AbstractFetch` that is shared by both `Fetcher` and—soon—`FetchRequestManager` 
needed a way to call some of the methods that `ConsumerNetworkClient` and 
`NetworkClientDelegate` have in common. Thus the `NodeStatusDetector` interface 
was created.
   
   Because the `FetchRequestManager` isn't in this PR, it's understandably 
unclear the reason for the current structure. For now I've made the following 
changes:
   
   1. I moved `maybeCloseFetchSessions` to `AbstractFetch`
   2. I left `AbstractFetch` with a reference to a `ConsumerNetworkClient`
   3. I've removed the `NodeStatusDetector`
   
   As needed, the above may be changed in forthcoming PRs.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to