kevin-wu24 commented on code in PR #21619:
URL: https://github.com/apache/kafka/pull/21619#discussion_r2880308894
##########
core/src/main/scala/kafka/server/ControllerRegistrationManager.scala:
##########
@@ -267,6 +267,7 @@ class ControllerRegistrationManager(
}
override def onTimeout(): Unit = {
+ pendingRpc = false
Review Comment:
> How about the response sent to the broker? The response sent to the broker
is important since that is what stops the broker from rescheduling registration
requests.
The controller who is registering looks for its
`ControllerRegistrationRecord` in the metadata log to stop rescheduling
requests. The `ControllerRegistrationManager` is a `MetadataPublisher`, and its
`onMetadataUpdate` will append an event. After the registering controller sees
its `ControllerRegistrationRecord` in the metadata log, it will stop sending
requests. When the registering controller receives a response from the active
controller, it logs a message, and updates some internal state. Importantly, it
does not schedule another request. Instead, upon receiving a metadata update,
that is when the registering controller re-enters
`maybeSendControllerRegistration`.
> I was trying to affirm that if onTimeout is called, it means that the
request was not sent and will not be sent.
Yeah, I see in `NodeToControllerRequestThread#generateRequests`, we call
`request.callback().onTimeout()` instead of adding the request to
`InterBrokerSendThread#unsentRequests`. This means that the registration
request is not sent and will not be sent over the wire when this callback is
invoked.
As for the case where the request is sent over the wire (i.e. in
`InterBrokerSendThread#sendRequests`, but no response is received before the
request times out), I see this code in the `NetworkClient`:
```
private void handleTimedOutRequests(List<ClientResponse> responses, long
now) {
List<String> nodeIds =
this.inFlightRequests.nodesWithTimedOutRequests(now);
for (String nodeId : nodeIds) {
// close connection to the node
this.selector.close(nodeId);
log.info("Disconnecting from node {} due to request timeout.",
nodeId);
processTimeoutDisconnection(responses, nodeId, now);
}
}
```
To me, this should result in this code executing in
`NodeToControllerRequestThread#handleResponse`:
```
else if (response.wasDisconnected()) {
updateControllerAddress(null);
requestQueue.addFirst(queueItem);
```
So the thread will not try to send a message over the wire again until after
getting a new controller address. Importantly, we add the same
`NodeToControllerQueueItem` reference back on the request queue. If we are in
this case for long enough (i.e. send request X over the wire, disconnect,
re-enqueue X again), eventually we will be in the case of calling
`NodeToControllerRequestThread#generateRequests()` where the request X has
timed out, and our `onTimeout()` callback will be invoked:
```
if (currentTimeMs - request.createdTimeMs() >= retryTimeoutMs) {
requestIter.remove();
request.callback().onTimeout();
}
```
With my code change, this means `pendingRpc = false`, and the controller can
send another `ControllerRegistrationRequest` on
`maybeSendControllerRegistrationRequest`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]