kirktrue commented on code in PR #14359: URL: https://github.com/apache/kafka/pull/14359#discussion_r1325174579
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java: ########## @@ -91,23 +105,70 @@ public void onFailure(RuntimeException e) { } }; - final RequestFuture<ClientResponse> future = client.send(fetchTarget, request); + final RequestFuture<ClientResponse> future = nodeStatusDetector.send(fetchTarget, request); future.addListener(listener); } return fetchRequestMap.size(); } - public void close(final Timer timer) { - if (!isClosed.compareAndSet(false, true)) { - log.info("Fetcher {} is already closed.", this); - return; + public Fetch<K, V> collectFetch() { + return fetchCollector.collectFetch(fetchBuffer); + } + + protected void maybeCloseFetchSessions(final Timer timer) { Review Comment: Yes, it is a bit obtuse. My apologies. There are two reasons that `maybeCloseFetchSessions` exists in `Fetcher`: 1. The forthcoming `FetchRequestManager` doesn't close sessions in the same way, so it doesn't make sense to have the method in the shared `AbstractFetch` class. 2. `FetchRequestManager` (as with the other `RequestManager`s) doesn't use the `ConsumerNetworkClient` but uses the `NetworkClientDelegate`. The code in `AbstractFetch` that is shared by both `Fetcher` and—soon—`FetchRequestManager` needed a way to call some of the methods that `ConsumerNetworkClient` and `NetworkClientDelegate` have in common. Thus the `NodeStatusDetector` interface was created. Because the `FetchRequestManager` isn't in this PR, it's understandably unclear the reason for the current structure. For now I've made the following changes: 1. I moved `maybeCloseFetchSessions` to `AbstractFetch` 2. I left `AbstractFetch` with a reference to a `ConsumerNetworkClient` 3. I've removed the `NodeStatusDetector` As needed, the above may be changed in forthcoming PRs. ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java: ########## @@ -91,23 +105,70 @@ public void onFailure(RuntimeException e) { } }; - final RequestFuture<ClientResponse> future = client.send(fetchTarget, request); + final RequestFuture<ClientResponse> future = nodeStatusDetector.send(fetchTarget, request); future.addListener(listener); } return fetchRequestMap.size(); } - public void close(final Timer timer) { - if (!isClosed.compareAndSet(false, true)) { - log.info("Fetcher {} is already closed.", this); - return; + public Fetch<K, V> collectFetch() { + return fetchCollector.collectFetch(fetchBuffer); + } + + protected void maybeCloseFetchSessions(final Timer timer) { Review Comment: Yes, it is a bit obtuse. My apologies. There are two reasons that `maybeCloseFetchSessions` exists in `Fetcher`: 1. The forthcoming `FetchRequestManager` doesn't close sessions in the same way, so it doesn't make sense to have the method in the shared `AbstractFetch` class. 2. `FetchRequestManager` (as with the other `RequestManager`s) doesn't use the `ConsumerNetworkClient` but uses the `NetworkClientDelegate`. The code in `AbstractFetch` that is shared by both `Fetcher` and—soon—`FetchRequestManager` needed a way to call some of the methods that `ConsumerNetworkClient` and `NetworkClientDelegate` have in common. Thus the `NodeStatusDetector` interface was created. Because the `FetchRequestManager` isn't in this PR, it's understandably unclear the reason for the current structure. For now I've made the following changes: 1. I moved `maybeCloseFetchSessions` to `AbstractFetch` 2. I left `AbstractFetch` with a reference to a `ConsumerNetworkClient` 3. I've removed the `NodeStatusDetector` As needed, the above may be changed in forthcoming PRs. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org