mumrah commented on a change in pull request #9564:
URL: https://github.com/apache/kafka/pull/9564#discussion_r537948141



##########
File path: core/src/main/scala/kafka/server/AlterIsrManager.scala
##########
@@ -88,20 +88,34 @@ class AlterIsrManagerImpl(val controllerChannelManager: 
BrokerToControllerChanne
 
   private def sendRequest(inflightAlterIsrItems: Seq[AlterIsrItem]): Unit = {
     val message = buildRequest(inflightAlterIsrItems)
-    def responseHandler(response: ClientResponse): Unit = {
-      try {
-        val body = response.responseBody().asInstanceOf[AlterIsrResponse]
-        handleAlterIsrResponse(body, message.brokerEpoch(), 
inflightAlterIsrItems)
-      } finally {
-        // Be sure to clear the in-flight flag to allow future AlterIsr 
requests
-        if (!inflightRequest.compareAndSet(true, false)) {
-          throw new IllegalStateException("AlterIsr response callback called 
when no requests were in flight")
+
+    def clearInflightRequests(): Unit = {
+      // Be sure to clear the in-flight flag to allow future AlterIsr requests
+      if (!inflightRequest.compareAndSet(true, false)) {
+        throw new IllegalStateException("AlterIsr response callback called 
when no requests were in flight")
+      }
+    }
+
+    class AlterIsrResponseHandler extends ControllerRequestCompletionHandler {
+      override def onComplete(response: ClientResponse): Unit = {
+        try {
+          val body = response.responseBody().asInstanceOf[AlterIsrResponse]
+          handleAlterIsrResponse(body, message.brokerEpoch, 
inflightAlterIsrItems)
+        } finally {
+          clearInflightRequests()
         }
       }
+
+      override def onTimeout(): Unit = {
+        throw new IllegalStateException("Encountered unexpected timeout when 
sending AlterIsr to the controller")
+      }
     }
 
     debug(s"Sending AlterIsr to controller $message")
-    controllerChannelManager.sendRequest(new AlterIsrRequest.Builder(message), 
responseHandler)
+    // We will not timeout AlterISR request, instead letting it retry 
indefinitely
+    // until a response is received or the request is cancelled after 
receiving new LeaderAndIsr state.

Review comment:
       I don't think we actually have the ability to cancel in-flight requests. 
The problem is that many partitions can have ISR changes batched together in a 
single request. So we couldn't cancel one without cancelling them all. 
   
   A new LeaderAndIsr arriving will just overwrite the existing `isrState` in 
Partition which will effectively cause any in-flight AlterIsr response to be 
ignored (for that partition)

##########
File path: core/src/main/scala/kafka/server/AlterIsrManager.scala
##########
@@ -88,20 +88,34 @@ class AlterIsrManagerImpl(val controllerChannelManager: 
BrokerToControllerChanne
 
   private def sendRequest(inflightAlterIsrItems: Seq[AlterIsrItem]): Unit = {
     val message = buildRequest(inflightAlterIsrItems)
-    def responseHandler(response: ClientResponse): Unit = {
-      try {
-        val body = response.responseBody().asInstanceOf[AlterIsrResponse]
-        handleAlterIsrResponse(body, message.brokerEpoch(), 
inflightAlterIsrItems)
-      } finally {
-        // Be sure to clear the in-flight flag to allow future AlterIsr 
requests
-        if (!inflightRequest.compareAndSet(true, false)) {
-          throw new IllegalStateException("AlterIsr response callback called 
when no requests were in flight")
+
+    def clearInflightRequests(): Unit = {
+      // Be sure to clear the in-flight flag to allow future AlterIsr requests
+      if (!inflightRequest.compareAndSet(true, false)) {
+        throw new IllegalStateException("AlterIsr response callback called 
when no requests were in flight")
+      }
+    }
+
+    class AlterIsrResponseHandler extends ControllerRequestCompletionHandler {

Review comment:
       nit: could we inline this as an anonymous class down below rather than 
defining it here? Just seems a little odd to define it like this and then 
calling in to the containing class for `inflightRequest`. 

##########
File path: core/src/main/scala/kafka/server/AlterIsrManager.scala
##########
@@ -88,20 +88,33 @@ class AlterIsrManagerImpl(val controllerChannelManager: 
BrokerToControllerChanne
 
   private def sendRequest(inflightAlterIsrItems: Seq[AlterIsrItem]): Unit = {
     val message = buildRequest(inflightAlterIsrItems)
-    def responseHandler(response: ClientResponse): Unit = {
-      try {
-        val body = response.responseBody().asInstanceOf[AlterIsrResponse]
-        handleAlterIsrResponse(body, message.brokerEpoch(), 
inflightAlterIsrItems)
-      } finally {
-        // Be sure to clear the in-flight flag to allow future AlterIsr 
requests
-        if (!inflightRequest.compareAndSet(true, false)) {
-          throw new IllegalStateException("AlterIsr response callback called 
when no requests were in flight")
+
+    def clearInflightRequests(): Unit = {
+      // Be sure to clear the in-flight flag to allow future AlterIsr requests
+      if (!inflightRequest.compareAndSet(true, false)) {
+        throw new IllegalStateException("AlterIsr response callback called 
when no requests were in flight")
+      }
+    }
+
+    class AlterIsrResponseHandler extends 
BrokerToControllerRequestCompletionHandler {
+      override def onComplete(response: ClientResponse): Unit = {
+        try {
+          val body = response.responseBody().asInstanceOf[AlterIsrResponse]
+          handleAlterIsrResponse(body, message.brokerEpoch(), 
inflightAlterIsrItems)
+        } finally {
+          clearInflightRequests()
         }
       }
+
+      override def onTimeout(): Unit = {
+        throw new IllegalStateException("Encountered unexpected timeout when 
sending AlterIsr to the controller")

Review comment:
       Thanks, makes sense. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to