abbccdda commented on a change in pull request #9564: URL: https://github.com/apache/kafka/pull/9564#discussion_r527359174
########## File path: core/src/main/scala/kafka/server/AlterIsrManager.scala ########## @@ -88,20 +88,33 @@ class AlterIsrManagerImpl(val controllerChannelManager: BrokerToControllerChanne private def sendRequest(inflightAlterIsrItems: Seq[AlterIsrItem]): Unit = { val message = buildRequest(inflightAlterIsrItems) - def responseHandler(response: ClientResponse): Unit = { - try { - val body = response.responseBody().asInstanceOf[AlterIsrResponse] - handleAlterIsrResponse(body, message.brokerEpoch(), inflightAlterIsrItems) - } finally { - // Be sure to clear the in-flight flag to allow future AlterIsr requests - if (!inflightRequest.compareAndSet(true, false)) { - throw new IllegalStateException("AlterIsr response callback called when no requests were in flight") + + def clearInflightRequests(): Unit = { + // Be sure to clear the in-flight flag to allow future AlterIsr requests + if (!inflightRequest.compareAndSet(true, false)) { + throw new IllegalStateException("AlterIsr response callback called when no requests were in flight") + } + } + + class AlterIsrResponseHandler extends BrokerToControllerRequestCompletionHandler { + override def onComplete(response: ClientResponse): Unit = { + try { + val body = response.responseBody().asInstanceOf[AlterIsrResponse] + handleAlterIsrResponse(body, message.brokerEpoch(), inflightAlterIsrItems) + } finally { + clearInflightRequests() } } + + override def onFailure(exception: RuntimeException): Unit = { + error(s"Encountered $exception when sending AlterIsr to the controller, clearing all pending states") + unsentIsrUpdates.clear() Review comment: Yea, I'm not sure whether we should clear all the pending updates here or not, just saw the comment as `Regardless of callback outcome, we need to clear from the unsent updates map to unblock further updates`, so thinking maybe we could just remove all pending updates here. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org