hachikuji commented on a change in pull request #9100: URL: https://github.com/apache/kafka/pull/9100#discussion_r494660983
########## File path: core/src/main/scala/kafka/cluster/Partition.scala ########## @@ -1366,6 +1366,12 @@ class Partition(val topicPartition: TopicPartition, } } + /** + * This is called for each partition in the body of an AlterIsr response. For errors which are non-retryable we simply Review comment: nit: conventionally we prefer "retriable" ########## File path: core/src/main/scala/kafka/cluster/Partition.scala ########## @@ -1375,11 +1381,14 @@ class Partition(val topicPartition: TopicPartition, case Errors.FENCED_LEADER_EPOCH => debug(s"Controller failed to update ISR to ${proposedIsr.mkString(",")} since we sent an old leader epoch. Giving up.") case Errors.INVALID_UPDATE_VERSION => - debug(s"Controller failed to update ISR to ${proposedIsr.mkString(",")} due to invalid zk version. Retrying.") - sendAlterIsrRequest(isrState) + debug(s"Controller failed to update ISR to ${proposedIsr.mkString(",")} due to invalid zk version. Giving up.") case _ => - warn(s"Controller failed to update ISR to ${proposedIsr.mkString(",")} due to $error. Retrying.") - sendAlterIsrRequest(isrState) + if (isrState.isInflight) { + warn(s"Controller failed to update ISR to ${proposedIsr.mkString(",")} due to $error. Retrying.") + sendAlterIsrRequest(isrState) + } else { + warn(s"Ignoring failed ISR update to ${proposedIsr.mkString(",")} since due to $error since we have a committed ISR.") Review comment: nit (for follow-up): fix grammar "since due" ########## File path: core/src/main/scala/kafka/controller/KafkaController.scala ########## @@ -1847,21 +1847,22 @@ class KafkaController(val config: KafkaConfig, // Determine which partitions we will accept the new ISR for val adjustedIsrs: Map[TopicPartition, LeaderAndIsr] = isrsToAlter.flatMap { case (tp: TopicPartition, newLeaderAndIsr: LeaderAndIsr) => - val partitionError: Errors = controllerContext.partitionLeadershipInfo(tp) match { + controllerContext.partitionLeadershipInfo(tp) match { case Some(leaderIsrAndControllerEpoch) => val currentLeaderAndIsr = leaderIsrAndControllerEpoch.leaderAndIsr if (newLeaderAndIsr.leaderEpoch < currentLeaderAndIsr.leaderEpoch) { - Errors.FENCED_LEADER_EPOCH + partitionResponses(tp) = Left(Errors.FENCED_LEADER_EPOCH) + None + } else if (newLeaderAndIsr.equalsIgnoreZk(currentLeaderAndIsr)) { + // If a partition is already in the desired state, just return it Review comment: It might be worth mentioning that this could happen in the case of a retry after a successful update. ########## File path: core/src/main/scala/kafka/server/KafkaApis.scala ########## @@ -3066,20 +3066,18 @@ class KafkaApis(val requestChannel: RequestChannel, def handleAlterIsrRequest(request: RequestChannel.Request): Unit = { val alterIsrRequest = request.body[AlterIsrRequest] - if (authorize(request.context, CLUSTER_ACTION, CLUSTER, CLUSTER_NAME)) { - if (!controller.isActive) { - sendResponseMaybeThrottle(request, requestThrottleMs => - alterIsrRequest.getErrorResponse(requestThrottleMs, Errors.NOT_CONTROLLER.exception())) - } else { - controller.alterIsrs(alterIsrRequest.data, - alterIsrResp => sendResponseMaybeThrottle(request, requestThrottleMs => - new AlterIsrResponse(alterIsrResp.setThrottleTimeMs(requestThrottleMs)) - ) - ) - } - } else { + if (!authorize(request.context, CLUSTER_ACTION, CLUSTER, CLUSTER_NAME)) { sendResponseMaybeThrottle(request, requestThrottleMs => alterIsrRequest.getErrorResponse(requestThrottleMs, Errors.CLUSTER_AUTHORIZATION_FAILED.exception)) + } else if (!controller.isActive) { + sendResponseMaybeThrottle(request, requestThrottleMs => + alterIsrRequest.getErrorResponse(requestThrottleMs, Errors.NOT_CONTROLLER.exception())) Review comment: nit: leave off parenthesis after `exception` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org