hachikuji commented on code in PR #12181:
URL: https://github.com/apache/kafka/pull/12181#discussion_r894152987


##########
core/src/main/scala/kafka/controller/KafkaController.scala:
##########
@@ -2319,125 +2324,117 @@ class KafkaController(val config: KafkaConfig,
     }
 
     val partitionResponses = mutable.HashMap[TopicPartition, Either[Errors, 
LeaderAndIsr]]()
-    try {
-      // Determine which partitions we will accept the new ISR for
-      val adjustedIsrs = partitionsToAlter.flatMap { case (tp, 
newLeaderAndIsr) =>
-        controllerContext.partitionLeadershipInfo(tp) match {
-          case Some(leaderIsrAndControllerEpoch) =>
-            val currentLeaderAndIsr = leaderIsrAndControllerEpoch.leaderAndIsr
-            if (newLeaderAndIsr.leaderEpoch != 
currentLeaderAndIsr.leaderEpoch) {
-              partitionResponses(tp) = Left(Errors.FENCED_LEADER_EPOCH)
-              None
-            } else if (newLeaderAndIsr.partitionEpoch < 
currentLeaderAndIsr.partitionEpoch) {
-              partitionResponses(tp) = Left(Errors.INVALID_UPDATE_VERSION)
-              None
-            } else if 
(newLeaderAndIsr.equalsIgnorePartitionEpoch(currentLeaderAndIsr)) {
-              // If a partition is already in the desired state, just return it
-              partitionResponses(tp) = Right(currentLeaderAndIsr)
-              None
-            } else if (newLeaderAndIsr.leaderRecoveryState == 
LeaderRecoveryState.RECOVERING && newLeaderAndIsr.isr.length > 1) {
-              partitionResponses(tp) = Left(Errors.INVALID_REQUEST)
-              info(
-                s"Rejecting AlterPartition from node $brokerId for $tp because 
leader is recovering and ISR is greater than 1: " +
-                s"$newLeaderAndIsr"
-              )
-              None
-            } else if (currentLeaderAndIsr.leaderRecoveryState == 
LeaderRecoveryState.RECOVERED &&
-              newLeaderAndIsr.leaderRecoveryState == 
LeaderRecoveryState.RECOVERING) {
-
-              partitionResponses(tp) = Left(Errors.INVALID_REQUEST)
-              info(
-                s"Rejecting AlterPartition from node $brokerId for $tp because 
the leader recovery state cannot change from " +
-                s"RECOVERED to RECOVERING: $newLeaderAndIsr"
-              )
-              None
-            } else {
-              Some(tp -> newLeaderAndIsr)
-            }
-
-          case None =>
-            partitionResponses(tp) = Left(Errors.UNKNOWN_TOPIC_OR_PARTITION)
+    // Determine which partitions we will accept the new ISR for
+    val adjustedIsrs = partitionsToAlter.flatMap { case (tp, newLeaderAndIsr) 
=>
+      controllerContext.partitionLeadershipInfo(tp) match {
+        case Some(leaderIsrAndControllerEpoch) =>
+          val currentLeaderAndIsr = leaderIsrAndControllerEpoch.leaderAndIsr
+          if (newLeaderAndIsr.leaderEpoch != currentLeaderAndIsr.leaderEpoch) {
+            partitionResponses(tp) = Left(Errors.FENCED_LEADER_EPOCH)
             None
-        }
-      }
+          } else if (newLeaderAndIsr.partitionEpoch < 
currentLeaderAndIsr.partitionEpoch) {
+            partitionResponses(tp) = Left(Errors.INVALID_UPDATE_VERSION)
+            None
+          } else if 
(newLeaderAndIsr.equalsIgnorePartitionEpoch(currentLeaderAndIsr)) {
+            // If a partition is already in the desired state, just return it
+            partitionResponses(tp) = Right(currentLeaderAndIsr)
+            None
+          } else if (newLeaderAndIsr.leaderRecoveryState == 
LeaderRecoveryState.RECOVERING && newLeaderAndIsr.isr.length > 1) {
+            partitionResponses(tp) = Left(Errors.INVALID_REQUEST)
+            info(
+              s"Rejecting AlterPartition from node $brokerId for $tp because 
leader is recovering and ISR is greater than 1: " +
+              s"$newLeaderAndIsr"
+            )
+            None
+          } else if (currentLeaderAndIsr.leaderRecoveryState == 
LeaderRecoveryState.RECOVERED &&
+            newLeaderAndIsr.leaderRecoveryState == 
LeaderRecoveryState.RECOVERING) {
 
-      // Do the updates in ZK
-      debug(s"Updating ISRs for partitions: ${adjustedIsrs.keySet}.")
-      val UpdateLeaderAndIsrResult(finishedUpdates, badVersionUpdates) = 
zkClient.updateLeaderAndIsr(
-        adjustedIsrs, controllerContext.epoch, 
controllerContext.epochZkVersion)
-
-      val successfulUpdates = finishedUpdates.flatMap { case (partition, 
isrOrError) =>
-        isrOrError match {
-          case Right(updatedIsr) =>
-            debug(s"ISR for partition $partition updated to 
[${updatedIsr.isr.mkString(",")}] and zkVersion updated to 
[${updatedIsr.partitionEpoch}]")
-            partitionResponses(partition) = Right(updatedIsr)
-            Some(partition -> updatedIsr)
-          case Left(e) =>
-            error(s"Failed to update ISR for partition $partition", e)
-            partitionResponses(partition) = Left(Errors.forException(e))
+            partitionResponses(tp) = Left(Errors.INVALID_REQUEST)
+            info(
+              s"Rejecting AlterPartition from node $brokerId for $tp because 
the leader recovery state cannot change from " +
+              s"RECOVERED to RECOVERING: $newLeaderAndIsr"
+            )
             None
-        }
+          } else {
+            Some(tp -> newLeaderAndIsr)
+          }
+
+        case None =>
+          partitionResponses(tp) = Left(Errors.UNKNOWN_TOPIC_OR_PARTITION)
+          None
       }
+    }
 
-      badVersionUpdates.foreach { partition =>
-        info(s"Failed to update ISR to ${adjustedIsrs(partition)} for 
partition $partition, bad ZK version.")
-        partitionResponses(partition) = Left(Errors.INVALID_UPDATE_VERSION)
+    // Do the updates in ZK
+    debug(s"Updating ISRs for partitions: ${adjustedIsrs.keySet}.")
+    val UpdateLeaderAndIsrResult(finishedUpdates, badVersionUpdates) = 
zkClient.updateLeaderAndIsr(
+      adjustedIsrs, controllerContext.epoch, controllerContext.epochZkVersion)
+
+    val successfulUpdates = finishedUpdates.flatMap { case (partition, 
isrOrError) =>
+      isrOrError match {
+        case Right(updatedIsr) =>
+          debug(s"ISR for partition $partition updated to $updatedIsr.")
+          partitionResponses(partition) = Right(updatedIsr)
+          Some(partition -> updatedIsr)
+        case Left(e) =>
+          error(s"Failed to update ISR for partition $partition", e)
+          partitionResponses(partition) = Left(Errors.forException(e))
+          None
       }
+    }
 
-      // Update our cache and send out metadata updates
-      updateLeaderAndIsrCache(successfulUpdates.keys.toSeq)
-      sendUpdateMetadataRequest(
-        controllerContext.liveOrShuttingDownBrokerIds.toSeq,
-        partitionsToAlter.keySet
-      )
+    badVersionUpdates.foreach { partition =>
+      info(s"Failed to update ISR to ${adjustedIsrs(partition)} for partition 
$partition, bad ZK version.")
+      partitionResponses(partition) = Left(Errors.INVALID_UPDATE_VERSION)
+    }
 
-      partitionResponses.groupBy(_._1.topic).forKeyValue { (topicName, 
partitionResponses) =>
-        // Add each topic part to the response
-        val topicResponse = if (useTopicsIds) {
-          new AlterPartitionResponseData.TopicData()
-            .setTopicId(controllerContext.topicIds.getOrElse(topicName, 
Uuid.ZERO_UUID))
-        } else {
-          new AlterPartitionResponseData.TopicData()
-            .setTopicName(topicName)
-        }
-        alterPartitionResponse.topics.add(topicResponse)
-
-        partitionResponses.forKeyValue { (tp, errorOrIsr) =>
-          // Add each partition part to the response (new ISR or error)
-          errorOrIsr match {
-            case Left(error) =>
-              topicResponse.partitions.add(
-                new AlterPartitionResponseData.PartitionData()
-                  .setPartitionIndex(tp.partition)
-                  .setErrorCode(error.code))
-            case Right(leaderAndIsr) =>
-              /* Setting the LeaderRecoveryState field is always safe because 
it will always be the same
-               * as the value set in the request. For version 0, that is 
always the default RECOVERED
-               * which is ignored when serializing to version 0. For any other 
version, the
-               * LeaderRecoveryState field is supported.
-               */
-              topicResponse.partitions.add(
-                new AlterPartitionResponseData.PartitionData()
-                  .setPartitionIndex(tp.partition)
-                  .setLeaderId(leaderAndIsr.leader)
-                  .setLeaderEpoch(leaderAndIsr.leaderEpoch)
-                  .setIsr(leaderAndIsr.isr.map(Integer.valueOf).asJava)
-                  
.setLeaderRecoveryState(leaderAndIsr.leaderRecoveryState.value)
-                  .setPartitionEpoch(leaderAndIsr.partitionEpoch)
-              )
-          }
+    // Update our cache and send out metadata updates
+    updateLeaderAndIsrCache(successfulUpdates.keys.toSeq)
+    sendUpdateMetadataRequest(
+      controllerContext.liveOrShuttingDownBrokerIds.toSeq,
+      partitionsToAlter.keySet
+    )
+
+    partitionResponses.groupBy(_._1.topic).forKeyValue { (topicName, 
partitionResponses) =>
+      // Add each topic part to the response
+      val topicResponse = if (useTopicsIds) {
+        new AlterPartitionResponseData.TopicData()
+          .setTopicId(controllerContext.topicIds.getOrElse(topicName, 
Uuid.ZERO_UUID))
+      } else {
+        new AlterPartitionResponseData.TopicData()
+          .setTopicName(topicName)
+      }
+      alterPartitionResponse.topics.add(topicResponse)
+
+      partitionResponses.forKeyValue { (tp, errorOrIsr) =>
+        // Add each partition part to the response (new ISR or error)
+        errorOrIsr match {
+          case Left(error) =>
+            topicResponse.partitions.add(
+              new AlterPartitionResponseData.PartitionData()
+                .setPartitionIndex(tp.partition)
+                .setErrorCode(error.code))
+          case Right(leaderAndIsr) =>
+            /* Setting the LeaderRecoveryState field is always safe because it 
will always be the same
+             * as the value set in the request. For version 0, that is always 
the default RECOVERED
+             * which is ignored when serializing to version 0. For any other 
version, the
+             * LeaderRecoveryState field is supported.
+             */
+            topicResponse.partitions.add(
+              new AlterPartitionResponseData.PartitionData()
+                .setPartitionIndex(tp.partition)
+                .setLeaderId(leaderAndIsr.leader)
+                .setLeaderEpoch(leaderAndIsr.leaderEpoch)
+                .setIsr(leaderAndIsr.isr.map(Integer.valueOf).asJava)
+                .setLeaderRecoveryState(leaderAndIsr.leaderRecoveryState.value)
+                .setPartitionEpoch(leaderAndIsr.partitionEpoch)
+            )
         }
       }
-
-      callback(alterPartitionResponse)
-    } catch {
-      case e: Throwable =>
-        error(s"Error when processing AlterPartition for partitions: 
${partitionsToAlter.keys.toSeq}", e)
-        callback(new 
AlterPartitionResponseData().setErrorCode(Errors.UNKNOWN_SERVER_ERROR.code))
-        // We consider that all partitions have failed.
-        partitionResponses.clear()
     }
 
+    callback(alterPartitionResponse)

Review Comment:
   Is it possible for the logic below to raise an exception? Would there be any 
consequences to invoking the callback twice?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to