ShivsundarR commented on code in PR #19886:
URL: https://github.com/apache/kafka/pull/19886#discussion_r2530061855
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java:
##########
@@ -229,6 +242,11 @@ protected void checkDisconnects(final long currentTimeMs) {
asyncConsumerMetrics.recordUnsentRequestsQueueTime(time.milliseconds() -
u.enqueueTimeMs());
AuthenticationException authenticationException =
client.authenticationException(u.node.get());
u.handler.onFailure(currentTimeMs, authenticationException);
+ } else if (u.node.isEmpty() && onClose) {
+ log.debug("Removing unsent request {} because the client is
closing", u);
+ iter.remove();
+
asyncConsumerMetrics.recordUnsentRequestsQueueTime(time.milliseconds() -
u.enqueueTimeMs());
+ u.handler.onFailure(currentTimeMs,
Errors.NETWORK_EXCEPTION.exception());
Review Comment:
Hi @lianetm , thanks for the review.
> with this we're discarding any request we may have on close , if the node
is disconnected, right?
Not really, so we will still allow time for all pending requests (like
`commitSync` or acknowledgements callback or `findCoordinator`) to complete,
here `onClose` will be true only when we have completed waiting in those steps
and reached at the step to close the network thread itself.
https://github.com/apache/kafka/blob/0f4dbf7fd76d0fb29fec1e3588399d67628d278f/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java#L979-L981
--------------
> But still thinking, could we be wrongly dropping other relevant requests
on close, like close fetch sessions?
Again as mentioned above and in this comment -
https://github.com/apache/kafka/pull/19886#issuecomment-2965330373, the boolean
`onClose` would only be true when we have completed all the required
steps(updating callbacks, closing out sessions, after sending
`stopFindCoordinatorOnCloseEvent`, etc).
So at this stage, there would be no other requests that we would be waiting
for(if there are any pending requests, they already had their timer expired or
get a response as "broker disconnected", that's why this stage of closing
network thread was reached).
So even if a broker came up again now and responded to a previous request
(lets say commit response), we would not be updating callbacks(or any
background events) anymore as in the application thread we have finished all
processing and reached the stage of closing the application event handler
itself.
----------------
> The ClassicConsumer (and Async before this PR) give those the full
closeTimeout
So the `ClassicConsumer` does give it full closeTimeout but only if the
coordinator isn't null.
https://github.com/apache/kafka/blob/b58aae288fb794b01fd2fc6a29748eba1d277bd6/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L1143-L1147
I tried this out locally and and if we shutdown the broker first, the
coordinator here is null, so it closes immediately.
I got this log also when consumer closed, so there is a check to check for
disconnected nodes there too.
https://github.com/apache/kafka/blob/0f4dbf7fd76d0fb29fec1e3588399d67628d278f/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java#L406-L408
-------------------
> Would that work? (and by "closing" I refer to the actual var inside the
CoordReqMgr, that is only flipped after committing)
- I don't think so, as this variable is set to true only on getting
"`StopFindCoordinatorOnCloseEvent`".
- But if a broker was shutdown before the client was shutdown, for both
`AsyncConsumer` and `ShareConsumer`, this unsent request for findCoordinator(to
a "null" node) still lingers around in `NetworkClientDelegate` and is retried
until the timeout and the `stopFindCoordinatorOnCloseEvent` has no effect.
So the `whenComplete` will be reached only after the timeout of 30 seconds.
So it would not help terminate the consumer as soon we hit ctrl-c.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]