kevin-wu24 commented on code in PR #21619:
URL: https://github.com/apache/kafka/pull/21619#discussion_r2880308894


##########
core/src/main/scala/kafka/server/ControllerRegistrationManager.scala:
##########
@@ -267,6 +267,7 @@ class ControllerRegistrationManager(
     }
 
     override def onTimeout(): Unit = {
+      pendingRpc = false

Review Comment:
   > How about the response sent to the broker? The response sent to the broker 
is important since that is what stops the broker from rescheduling registration 
requests.
   
   The controller who is registering looks for its 
`ControllerRegistrationRecord` in the metadata log to stop rescheduling 
requests. The `ControllerRegistrationManager` is a `MetadataPublisher`, and its 
`onMetadataUpdate` will append an event. After the registering controller sees 
its `ControllerRegistrationRecord` in the metadata log, it will stop sending 
requests. When the registering controller receives a response from the active 
controller, it logs a message, and updates some internal state. Importantly, it 
does not schedule another request. Instead, upon receiving a metadata update, 
that is when the registering controller re-enters 
`maybeSendControllerRegistration`.
   
   > I was trying to affirm that if onTimeout is called, it means that the 
request was not sent and will not be sent.
   
   Yeah, I see in `NodeToControllerRequestThread#generateRequests`, we call 
`request.callback().onTimeout()` instead of adding the request to 
`InterBrokerSendThread#unsentRequests`. This means that the registration 
request is not sent and will not be sent over the wire when this callback is 
invoked.
   
   As for the case where the request is sent over the wire (i.e. in 
`InterBrokerSendThread#sendRequests`, but no response is received before the 
request times out), I see this code in the `NetworkClient`:
   ```
   private void handleTimedOutRequests(List<ClientResponse> responses, long 
now) {
           List<String> nodeIds = 
this.inFlightRequests.nodesWithTimedOutRequests(now);
           for (String nodeId : nodeIds) {
               // close connection to the node
               this.selector.close(nodeId);
               log.info("Disconnecting from node {} due to request timeout.", 
nodeId);
               processTimeoutDisconnection(responses, nodeId, now);
           }
       }
   ```
   To me, this should result in this code executing in 
`NodeToControllerRequestThread#handleResponse`:
   ```
   else if (response.wasDisconnected()) {
               updateControllerAddress(null);
               requestQueue.addFirst(queueItem);
   ```
   So the thread will not try to send a message over the wire again until after 
getting a new controller address. Importantly, we add the same 
`NodeToControllerQueueItem` reference back on the request queue. If we are in 
this case for long enough (i.e. send request X over the wire, disconnect, 
re-enqueue X again), eventually we will be in the case of calling 
`NodeToControllerRequestThread#generateRequests()` where the request X has 
timed out, and our `onTimeout()` callback will be invoked:
   ```
   if (currentTimeMs - request.createdTimeMs() >= retryTimeoutMs) {
                   requestIter.remove();
                   request.callback().onTimeout();
               } 
   ```
   With my code change, this means `pendingRpc = false`, and the controller can 
send another `ControllerRegistrationRequest` on 
`maybeSendControllerRegistrationRequest`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to