abbccdda commented on a change in pull request #9564:
URL: https://github.com/apache/kafka/pull/9564#discussion_r533814137



##########
File path: core/src/main/scala/kafka/server/AlterIsrManager.scala
##########
@@ -88,20 +88,33 @@ class AlterIsrManagerImpl(val controllerChannelManager: 
BrokerToControllerChanne
 
   private def sendRequest(inflightAlterIsrItems: Seq[AlterIsrItem]): Unit = {
     val message = buildRequest(inflightAlterIsrItems)
-    def responseHandler(response: ClientResponse): Unit = {
-      try {
-        val body = response.responseBody().asInstanceOf[AlterIsrResponse]
-        handleAlterIsrResponse(body, message.brokerEpoch(), 
inflightAlterIsrItems)
-      } finally {
-        // Be sure to clear the in-flight flag to allow future AlterIsr 
requests
-        if (!inflightRequest.compareAndSet(true, false)) {
-          throw new IllegalStateException("AlterIsr response callback called 
when no requests were in flight")
+
+    def clearInflightRequests(): Unit = {
+      // Be sure to clear the in-flight flag to allow future AlterIsr requests
+      if (!inflightRequest.compareAndSet(true, false)) {
+        throw new IllegalStateException("AlterIsr response callback called 
when no requests were in flight")
+      }
+    }
+
+    class AlterIsrResponseHandler extends 
BrokerToControllerRequestCompletionHandler {
+      override def onComplete(response: ClientResponse): Unit = {
+        try {
+          val body = response.responseBody().asInstanceOf[AlterIsrResponse]
+          handleAlterIsrResponse(body, message.brokerEpoch(), 
inflightAlterIsrItems)
+        } finally {
+          clearInflightRequests()
         }
       }
+
+      override def onTimeout(): Unit = {
+        warn(s"Encountered request when sending AlterIsr to the controller")

Review comment:
       That's what I decide to do eventually.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to