hachikuji commented on a change in pull request #9100:
URL: https://github.com/apache/kafka/pull/9100#discussion_r494660983



##########
File path: core/src/main/scala/kafka/cluster/Partition.scala
##########
@@ -1366,6 +1366,12 @@ class Partition(val topicPartition: TopicPartition,
     }
   }
 
+  /**
+   * This is called for each partition in the body of an AlterIsr response. 
For errors which are non-retryable we simply

Review comment:
       nit: conventionally we prefer "retriable"

##########
File path: core/src/main/scala/kafka/cluster/Partition.scala
##########
@@ -1375,11 +1381,14 @@ class Partition(val topicPartition: TopicPartition,
           case Errors.FENCED_LEADER_EPOCH =>
             debug(s"Controller failed to update ISR to 
${proposedIsr.mkString(",")} since we sent an old leader epoch. Giving up.")
           case Errors.INVALID_UPDATE_VERSION =>
-            debug(s"Controller failed to update ISR to 
${proposedIsr.mkString(",")} due to invalid zk version. Retrying.")
-            sendAlterIsrRequest(isrState)
+            debug(s"Controller failed to update ISR to 
${proposedIsr.mkString(",")} due to invalid zk version. Giving up.")
           case _ =>
-            warn(s"Controller failed to update ISR to 
${proposedIsr.mkString(",")} due to $error. Retrying.")
-            sendAlterIsrRequest(isrState)
+            if (isrState.isInflight) {
+              warn(s"Controller failed to update ISR to 
${proposedIsr.mkString(",")} due to $error. Retrying.")
+              sendAlterIsrRequest(isrState)
+            } else {
+              warn(s"Ignoring failed ISR update to 
${proposedIsr.mkString(",")} since due to $error since we have a committed 
ISR.")

Review comment:
       nit (for follow-up): fix grammar "since due"

##########
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##########
@@ -1847,21 +1847,22 @@ class KafkaController(val config: KafkaConfig,
       // Determine which partitions we will accept the new ISR for
       val adjustedIsrs: Map[TopicPartition, LeaderAndIsr] = 
isrsToAlter.flatMap {
         case (tp: TopicPartition, newLeaderAndIsr: LeaderAndIsr) =>
-          val partitionError: Errors = 
controllerContext.partitionLeadershipInfo(tp) match {
+          controllerContext.partitionLeadershipInfo(tp) match {
             case Some(leaderIsrAndControllerEpoch) =>
               val currentLeaderAndIsr = 
leaderIsrAndControllerEpoch.leaderAndIsr
               if (newLeaderAndIsr.leaderEpoch < 
currentLeaderAndIsr.leaderEpoch) {
-                Errors.FENCED_LEADER_EPOCH
+                partitionResponses(tp) = Left(Errors.FENCED_LEADER_EPOCH)
+                None
+              } else if (newLeaderAndIsr.equalsIgnoreZk(currentLeaderAndIsr)) {
+                // If a partition is already in the desired state, just return 
it

Review comment:
       It might be worth mentioning that this could happen in the case of a 
retry after a successful update.

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -3066,20 +3066,18 @@ class KafkaApis(val requestChannel: RequestChannel,
   def handleAlterIsrRequest(request: RequestChannel.Request): Unit = {
     val alterIsrRequest = request.body[AlterIsrRequest]
 
-    if (authorize(request.context, CLUSTER_ACTION, CLUSTER, CLUSTER_NAME)) {
-      if (!controller.isActive) {
-        sendResponseMaybeThrottle(request, requestThrottleMs =>
-          alterIsrRequest.getErrorResponse(requestThrottleMs, 
Errors.NOT_CONTROLLER.exception()))
-      } else {
-        controller.alterIsrs(alterIsrRequest.data,
-          alterIsrResp => sendResponseMaybeThrottle(request, requestThrottleMs 
=>
-            new 
AlterIsrResponse(alterIsrResp.setThrottleTimeMs(requestThrottleMs))
-          )
-        )
-      }
-    } else {
+    if (!authorize(request.context, CLUSTER_ACTION, CLUSTER, CLUSTER_NAME)) {
       sendResponseMaybeThrottle(request, requestThrottleMs =>
         alterIsrRequest.getErrorResponse(requestThrottleMs, 
Errors.CLUSTER_AUTHORIZATION_FAILED.exception))
+    } else if (!controller.isActive) {
+      sendResponseMaybeThrottle(request, requestThrottleMs =>
+        alterIsrRequest.getErrorResponse(requestThrottleMs, 
Errors.NOT_CONTROLLER.exception()))

Review comment:
       nit: leave off parenthesis after `exception`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to