hachikuji commented on code in PR #12181:
URL: https://github.com/apache/kafka/pull/12181#discussion_r892928790


##########
clients/src/main/java/org/apache/kafka/common/requests/AlterPartitionRequest.java:
##########
@@ -57,8 +57,13 @@ public static class Builder extends 
AbstractRequest.Builder<AlterPartitionReques
 
         private final AlterPartitionRequestData data;
 
-        public Builder(AlterPartitionRequestData data) {
-            super(ApiKeys.ALTER_PARTITION);
+        public Builder(AlterPartitionRequestData data, boolean canUseTopicIds) 
{

Review Comment:
   It may be useful to document the assumption that `AlterPartitionRequestData` 
is setting both `topicName` and `topicId`. 



##########
core/src/main/scala/kafka/cluster/Partition.scala:
##########
@@ -1571,14 +1620,26 @@ class Partition(val topicPartition: TopicPartition,
     error match {
       case Errors.OPERATION_NOT_ATTEMPTED =>
         // Since the operation was not attempted, it is safe to reset back to 
the committed state.
-        partitionState = CommittedPartitionState(proposedIsrState.isr, 
LeaderRecoveryState.RECOVERED)
+        partitionState = proposedIsrState.lastCommittedState
         debug(s"Failed to alter partition to $proposedIsrState since there is 
a pending AlterPartition still inflight. " +
-          s"partition state has been reset to the latest committed state 
$partitionState")
+          s"Partition state has been reset to the latest committed state 
$partitionState")
+        false
+      case Errors.INELIGIBLE_REPLICA =>
+        // Since the operation was rejected, it is safe to reset back to the 
committed state. This
+        // assumes that the current state was still the correct expected state.
+        // This is only raised in KRaft mode.
+        partitionState = proposedIsrState.lastCommittedState
+        debug(s"Failed to alter partition to $proposedIsrState since the 
controller rejected at least one replica " +

Review Comment:
   Do you think we could raise these two log messages to info? I think they are 
rare events and it ensures that we will see the transition back to the old 
state.



##########
core/src/main/scala/kafka/controller/KafkaController.scala:
##########
@@ -2225,194 +2223,228 @@ class KafkaController(val config: KafkaConfig,
     }
   }
 
-  def alterPartitions(alterPartitionRequest: AlterPartitionRequestData, 
callback: AlterPartitionResponseData => Unit): Unit = {
-    val partitionsToAlter = mutable.Map[TopicPartition, LeaderAndIsr]()
-
-    alterPartitionRequest.topics.forEach { topicReq =>
-      topicReq.partitions.forEach { partitionReq =>
-        partitionsToAlter.put(
-          new TopicPartition(topicReq.name, partitionReq.partitionIndex),
-          LeaderAndIsr(
-            alterPartitionRequest.brokerId,
-            partitionReq.leaderEpoch,
-            partitionReq.newIsr().asScala.toList.map(_.toInt),
-            LeaderRecoveryState.of(partitionReq.leaderRecoveryState),
-            partitionReq.partitionEpoch
-          )
-        )
-      }
-    }
+  def alterPartitions(
+    alterPartitionRequest: AlterPartitionRequestData,
+    alterPartitionRequestVersion: Short,
+    callback: AlterPartitionResponseData => Unit
+  ): Unit = {
+    eventManager.put(AlterPartitionReceived(
+      alterPartitionRequest,
+      alterPartitionRequestVersion,
+      callback
+    ))
+  }
 
-    def responseCallback(results: Either[Map[TopicPartition, Either[Errors, 
LeaderAndIsr]], Errors]): Unit = {
-      val resp = new AlterPartitionResponseData()
-      results match {
-        case Right(error) =>
-          resp.setErrorCode(error.code)
-        case Left(partitionResults) =>
-          resp.setTopics(new util.ArrayList())
-          partitionResults
-            .groupBy { case (tp, _) => tp.topic }   // Group by topic
-            .foreach { case (topic, partitions) =>
-              // Add each topic part to the response
-              val topicResp = new AlterPartitionResponseData.TopicData()
-                .setName(topic)
-                .setPartitions(new util.ArrayList())
-              resp.topics.add(topicResp)
-              partitions.foreach { case (tp, errorOrIsr) =>
-                // Add each partition part to the response (new ISR or error)
-                errorOrIsr match {
-                  case Left(error) => topicResp.partitions.add(
-                    new AlterPartitionResponseData.PartitionData()
-                      .setPartitionIndex(tp.partition)
-                      .setErrorCode(error.code))
-                  case Right(leaderAndIsr) =>
-                    /* Setting the LeaderRecoveryState field is always safe 
because it will always be the same
-                     * as the value set in the request. For version 0, that is 
always the default RECOVERED
-                     * which is ignored when serializing to version 0. For any 
other version, the
-                     * LeaderRecoveryState field is supported.
-                     */
-                    topicResp.partitions.add(
-                      new AlterPartitionResponseData.PartitionData()
-                        .setPartitionIndex(tp.partition)
-                        .setLeaderId(leaderAndIsr.leader)
-                        .setLeaderEpoch(leaderAndIsr.leaderEpoch)
-                        .setIsr(leaderAndIsr.isr.map(Integer.valueOf).asJava)
-                        
.setLeaderRecoveryState(leaderAndIsr.leaderRecoveryState.value)
-                        .setPartitionEpoch(leaderAndIsr.partitionEpoch)
-                    )
-                }
-            }
-          }
-      }
-      callback.apply(resp)
+  private def processAlterPartition(
+    alterPartitionRequest: AlterPartitionRequestData,
+    alterPartitionRequestVersion: Short,
+    callback: AlterPartitionResponseData => Unit
+  ): Unit = {
+    try {
+      doProcessAlterPartition(
+        alterPartitionRequest,
+        alterPartitionRequestVersion,
+        callback
+      )
+    } catch {
+      case e: Throwable =>
+        error(s"Error when processing AlterPartition: $alterPartitionRequest", 
e)
+        callback(new 
AlterPartitionResponseData().setErrorCode(Errors.UNKNOWN_SERVER_ERROR.code))
     }
-
-    eventManager.put(
-      AlterPartitionReceived(alterPartitionRequest.brokerId, 
alterPartitionRequest.brokerEpoch, partitionsToAlter, responseCallback)
-    )
   }
 
-  private def processAlterPartition(
-    brokerId: Int,
-    brokerEpoch: Long,
-    partitionsToAlter: Map[TopicPartition, LeaderAndIsr],
-    callback: AlterPartitionCallback
+  private def doProcessAlterPartition(
+    alterPartitionRequest: AlterPartitionRequestData,
+    alterPartitionRequestVersion: Short,
+    callback: AlterPartitionResponseData => Unit
   ): Unit = {
+    val useTopicsIds = alterPartitionRequestVersion > 1
 
     // Handle a few short-circuits
     if (!isActive) {
-      callback.apply(Right(Errors.NOT_CONTROLLER))
+      callback(new 
AlterPartitionResponseData().setErrorCode(Errors.NOT_CONTROLLER.code))
       return
     }
 
+    val brokerId = alterPartitionRequest.brokerId
+    val brokerEpoch = alterPartitionRequest.brokerEpoch
     val brokerEpochOpt = controllerContext.liveBrokerIdAndEpochs.get(brokerId)
     if (brokerEpochOpt.isEmpty) {
       info(s"Ignoring AlterPartition due to unknown broker $brokerId")
-      callback.apply(Right(Errors.STALE_BROKER_EPOCH))
+      callback(new 
AlterPartitionResponseData().setErrorCode(Errors.STALE_BROKER_EPOCH.code))
       return
     }
 
     if (!brokerEpochOpt.contains(brokerEpoch)) {
       info(s"Ignoring AlterPartition due to stale broker epoch $brokerEpoch 
and local broker epoch $brokerEpochOpt for broker $brokerId")
-      callback.apply(Right(Errors.STALE_BROKER_EPOCH))
+      callback(new 
AlterPartitionResponseData().setErrorCode(Errors.STALE_BROKER_EPOCH.code))
       return
     }
 
-    val response = try {
-      val partitionResponses = mutable.HashMap[TopicPartition, Either[Errors, 
LeaderAndIsr]]()
+    val partitionsToAlter = new mutable.HashMap[TopicPartition, LeaderAndIsr]()
+    val alterPartitionResponse = new AlterPartitionResponseData()
 
+    alterPartitionRequest.topics.forEach { topicReq =>
+      val topicNameOpt = if (useTopicsIds) {
+        controllerContext.topicName(topicReq.topicId)
+      } else {
+        Some(topicReq.topicName)
+      }
+
+      topicNameOpt match {
+        case None =>
+          val topicResponse = new AlterPartitionResponseData.TopicData()
+            .setTopicId(topicReq.topicId)
+          alterPartitionResponse.topics.add(topicResponse)
+          topicReq.partitions.forEach { partitionReq =>
+            topicResponse.partitions.add(new 
AlterPartitionResponseData.PartitionData()
+              .setPartitionIndex(partitionReq.partitionIndex)
+              .setErrorCode(Errors.UNKNOWN_TOPIC_ID.code))
+          }
+
+        case Some(topicName) =>
+          topicReq.partitions.forEach { partitionReq =>
+            partitionsToAlter.put(
+              new TopicPartition(topicName, partitionReq.partitionIndex),
+              LeaderAndIsr(
+                alterPartitionRequest.brokerId,
+                partitionReq.leaderEpoch,
+                partitionReq.newIsr.asScala.toList.map(_.toInt),
+                LeaderRecoveryState.of(partitionReq.leaderRecoveryState),
+                partitionReq.partitionEpoch
+              )
+            )
+          }
+      }
+    }
+
+    val partitionResponses = mutable.HashMap[TopicPartition, Either[Errors, 
LeaderAndIsr]]()
+    try {
       // Determine which partitions we will accept the new ISR for
-      val adjustedIsrs: Map[TopicPartition, LeaderAndIsr] = 
partitionsToAlter.flatMap {
-        case (tp: TopicPartition, newLeaderAndIsr: LeaderAndIsr) =>
-          controllerContext.partitionLeadershipInfo(tp) match {
-            case Some(leaderIsrAndControllerEpoch) =>
-              val currentLeaderAndIsr = 
leaderIsrAndControllerEpoch.leaderAndIsr
-              if (newLeaderAndIsr.leaderEpoch != 
currentLeaderAndIsr.leaderEpoch) {
-                partitionResponses(tp) = Left(Errors.FENCED_LEADER_EPOCH)
-                None
-              } else if (newLeaderAndIsr.partitionEpoch < 
currentLeaderAndIsr.partitionEpoch) {
-                partitionResponses(tp) = Left(Errors.INVALID_UPDATE_VERSION)
-                None
-              } else if 
(newLeaderAndIsr.equalsIgnorePartitionEpoch(currentLeaderAndIsr)) {
-                // If a partition is already in the desired state, just return 
it
-                partitionResponses(tp) = Right(currentLeaderAndIsr)
-                None
-              } else if (newLeaderAndIsr.leaderRecoveryState == 
LeaderRecoveryState.RECOVERING && newLeaderAndIsr.isr.length > 1) {
-                partitionResponses(tp) = Left(Errors.INVALID_REQUEST)
-                info(
-                  s"Rejecting AlterPartition from node $brokerId for $tp 
because leader is recovering and ISR is greater than 1: " +
-                  s"$newLeaderAndIsr"
-                )
-                None
-              } else if (currentLeaderAndIsr.leaderRecoveryState == 
LeaderRecoveryState.RECOVERED &&
-                newLeaderAndIsr.leaderRecoveryState == 
LeaderRecoveryState.RECOVERING) {
-
-                partitionResponses(tp) = Left(Errors.INVALID_REQUEST)
-                info(
-                  s"Rejecting AlterPartition from node $brokerId for $tp 
because the leader recovery state cannot change from " +
-                  s"RECOVERED to RECOVERING: $newLeaderAndIsr"
-                )
-                None
-              } else {
-                Some(tp -> newLeaderAndIsr)
-              }
-            case None =>
-              partitionResponses(tp) = Left(Errors.UNKNOWN_TOPIC_OR_PARTITION)
+      val adjustedIsrs = partitionsToAlter.flatMap { case (tp, 
newLeaderAndIsr) =>
+        controllerContext.partitionLeadershipInfo(tp) match {
+          case Some(leaderIsrAndControllerEpoch) =>
+            val currentLeaderAndIsr = leaderIsrAndControllerEpoch.leaderAndIsr
+            if (newLeaderAndIsr.leaderEpoch != 
currentLeaderAndIsr.leaderEpoch) {
+              partitionResponses(tp) = Left(Errors.FENCED_LEADER_EPOCH)
               None
-          }
+            } else if (newLeaderAndIsr.partitionEpoch < 
currentLeaderAndIsr.partitionEpoch) {
+              partitionResponses(tp) = Left(Errors.INVALID_UPDATE_VERSION)

Review Comment:
   Not for this PR, but should we consider changing this to 
INVALID_PARTITION_EPOCH?



##########
core/src/main/scala/kafka/cluster/Partition.scala:
##########
@@ -1571,14 +1620,26 @@ class Partition(val topicPartition: TopicPartition,
     error match {
       case Errors.OPERATION_NOT_ATTEMPTED =>
         // Since the operation was not attempted, it is safe to reset back to 
the committed state.
-        partitionState = CommittedPartitionState(proposedIsrState.isr, 
LeaderRecoveryState.RECOVERED)
+        partitionState = proposedIsrState.lastCommittedState
         debug(s"Failed to alter partition to $proposedIsrState since there is 
a pending AlterPartition still inflight. " +
-          s"partition state has been reset to the latest committed state 
$partitionState")
+          s"Partition state has been reset to the latest committed state 
$partitionState")
+        false
+      case Errors.INELIGIBLE_REPLICA =>
+        // Since the operation was rejected, it is safe to reset back to the 
committed state. This
+        // assumes that the current state was still the correct expected state.
+        // This is only raised in KRaft mode.
+        partitionState = proposedIsrState.lastCommittedState
+        debug(s"Failed to alter partition to $proposedIsrState since the 
controller rejected at least one replica " +
+          s"because it is ineligible to join the ISR. Partition state has been 
reset to the latest committed state $partitionState.")
         false
       case Errors.UNKNOWN_TOPIC_OR_PARTITION =>
         debug(s"Failed to alter partition to $proposedIsrState since the 
controller doesn't know about " +
           "this topic or partition. Giving up.")
         false
+      case Errors.UNKNOWN_TOPIC_ID =>
+        debug(s"Failed to alter partition to $proposedIsrState since the 
controller doesn't know about " +
+          "this topic. Giving up.")

Review Comment:
   Instead of saying "Giving up," perhaps we could say that our state may be 
out of sync and we will await the latest metadata?



##########
core/src/main/scala/kafka/controller/KafkaController.scala:
##########
@@ -2225,194 +2223,228 @@ class KafkaController(val config: KafkaConfig,
     }
   }
 
-  def alterPartitions(alterPartitionRequest: AlterPartitionRequestData, 
callback: AlterPartitionResponseData => Unit): Unit = {
-    val partitionsToAlter = mutable.Map[TopicPartition, LeaderAndIsr]()
-
-    alterPartitionRequest.topics.forEach { topicReq =>
-      topicReq.partitions.forEach { partitionReq =>
-        partitionsToAlter.put(
-          new TopicPartition(topicReq.name, partitionReq.partitionIndex),
-          LeaderAndIsr(
-            alterPartitionRequest.brokerId,
-            partitionReq.leaderEpoch,
-            partitionReq.newIsr().asScala.toList.map(_.toInt),
-            LeaderRecoveryState.of(partitionReq.leaderRecoveryState),
-            partitionReq.partitionEpoch
-          )
-        )
-      }
-    }
+  def alterPartitions(
+    alterPartitionRequest: AlterPartitionRequestData,
+    alterPartitionRequestVersion: Short,
+    callback: AlterPartitionResponseData => Unit
+  ): Unit = {
+    eventManager.put(AlterPartitionReceived(
+      alterPartitionRequest,
+      alterPartitionRequestVersion,
+      callback
+    ))
+  }
 
-    def responseCallback(results: Either[Map[TopicPartition, Either[Errors, 
LeaderAndIsr]], Errors]): Unit = {
-      val resp = new AlterPartitionResponseData()
-      results match {
-        case Right(error) =>
-          resp.setErrorCode(error.code)
-        case Left(partitionResults) =>
-          resp.setTopics(new util.ArrayList())
-          partitionResults
-            .groupBy { case (tp, _) => tp.topic }   // Group by topic
-            .foreach { case (topic, partitions) =>
-              // Add each topic part to the response
-              val topicResp = new AlterPartitionResponseData.TopicData()
-                .setName(topic)
-                .setPartitions(new util.ArrayList())
-              resp.topics.add(topicResp)
-              partitions.foreach { case (tp, errorOrIsr) =>
-                // Add each partition part to the response (new ISR or error)
-                errorOrIsr match {
-                  case Left(error) => topicResp.partitions.add(
-                    new AlterPartitionResponseData.PartitionData()
-                      .setPartitionIndex(tp.partition)
-                      .setErrorCode(error.code))
-                  case Right(leaderAndIsr) =>
-                    /* Setting the LeaderRecoveryState field is always safe 
because it will always be the same
-                     * as the value set in the request. For version 0, that is 
always the default RECOVERED
-                     * which is ignored when serializing to version 0. For any 
other version, the
-                     * LeaderRecoveryState field is supported.
-                     */
-                    topicResp.partitions.add(
-                      new AlterPartitionResponseData.PartitionData()
-                        .setPartitionIndex(tp.partition)
-                        .setLeaderId(leaderAndIsr.leader)
-                        .setLeaderEpoch(leaderAndIsr.leaderEpoch)
-                        .setIsr(leaderAndIsr.isr.map(Integer.valueOf).asJava)
-                        
.setLeaderRecoveryState(leaderAndIsr.leaderRecoveryState.value)
-                        .setPartitionEpoch(leaderAndIsr.partitionEpoch)
-                    )
-                }
-            }
-          }
-      }
-      callback.apply(resp)
+  private def processAlterPartition(
+    alterPartitionRequest: AlterPartitionRequestData,
+    alterPartitionRequestVersion: Short,
+    callback: AlterPartitionResponseData => Unit
+  ): Unit = {
+    try {
+      doProcessAlterPartition(
+        alterPartitionRequest,
+        alterPartitionRequestVersion,
+        callback
+      )
+    } catch {
+      case e: Throwable =>
+        error(s"Error when processing AlterPartition: $alterPartitionRequest", 
e)
+        callback(new 
AlterPartitionResponseData().setErrorCode(Errors.UNKNOWN_SERVER_ERROR.code))
     }
-
-    eventManager.put(
-      AlterPartitionReceived(alterPartitionRequest.brokerId, 
alterPartitionRequest.brokerEpoch, partitionsToAlter, responseCallback)
-    )
   }
 
-  private def processAlterPartition(
-    brokerId: Int,
-    brokerEpoch: Long,
-    partitionsToAlter: Map[TopicPartition, LeaderAndIsr],
-    callback: AlterPartitionCallback
+  private def doProcessAlterPartition(
+    alterPartitionRequest: AlterPartitionRequestData,
+    alterPartitionRequestVersion: Short,
+    callback: AlterPartitionResponseData => Unit
   ): Unit = {
+    val useTopicsIds = alterPartitionRequestVersion > 1
 
     // Handle a few short-circuits
     if (!isActive) {
-      callback.apply(Right(Errors.NOT_CONTROLLER))
+      callback(new 
AlterPartitionResponseData().setErrorCode(Errors.NOT_CONTROLLER.code))
       return
     }
 
+    val brokerId = alterPartitionRequest.brokerId
+    val brokerEpoch = alterPartitionRequest.brokerEpoch
     val brokerEpochOpt = controllerContext.liveBrokerIdAndEpochs.get(brokerId)
     if (brokerEpochOpt.isEmpty) {
       info(s"Ignoring AlterPartition due to unknown broker $brokerId")
-      callback.apply(Right(Errors.STALE_BROKER_EPOCH))
+      callback(new 
AlterPartitionResponseData().setErrorCode(Errors.STALE_BROKER_EPOCH.code))
       return
     }
 
     if (!brokerEpochOpt.contains(brokerEpoch)) {
       info(s"Ignoring AlterPartition due to stale broker epoch $brokerEpoch 
and local broker epoch $brokerEpochOpt for broker $brokerId")
-      callback.apply(Right(Errors.STALE_BROKER_EPOCH))
+      callback(new 
AlterPartitionResponseData().setErrorCode(Errors.STALE_BROKER_EPOCH.code))
       return
     }
 
-    val response = try {
-      val partitionResponses = mutable.HashMap[TopicPartition, Either[Errors, 
LeaderAndIsr]]()
+    val partitionsToAlter = new mutable.HashMap[TopicPartition, LeaderAndIsr]()
+    val alterPartitionResponse = new AlterPartitionResponseData()
 
+    alterPartitionRequest.topics.forEach { topicReq =>
+      val topicNameOpt = if (useTopicsIds) {
+        controllerContext.topicName(topicReq.topicId)
+      } else {
+        Some(topicReq.topicName)
+      }
+
+      topicNameOpt match {
+        case None =>
+          val topicResponse = new AlterPartitionResponseData.TopicData()
+            .setTopicId(topicReq.topicId)
+          alterPartitionResponse.topics.add(topicResponse)
+          topicReq.partitions.forEach { partitionReq =>
+            topicResponse.partitions.add(new 
AlterPartitionResponseData.PartitionData()
+              .setPartitionIndex(partitionReq.partitionIndex)
+              .setErrorCode(Errors.UNKNOWN_TOPIC_ID.code))
+          }
+
+        case Some(topicName) =>
+          topicReq.partitions.forEach { partitionReq =>
+            partitionsToAlter.put(
+              new TopicPartition(topicName, partitionReq.partitionIndex),
+              LeaderAndIsr(
+                alterPartitionRequest.brokerId,
+                partitionReq.leaderEpoch,
+                partitionReq.newIsr.asScala.toList.map(_.toInt),
+                LeaderRecoveryState.of(partitionReq.leaderRecoveryState),
+                partitionReq.partitionEpoch
+              )
+            )
+          }
+      }
+    }
+
+    val partitionResponses = mutable.HashMap[TopicPartition, Either[Errors, 
LeaderAndIsr]]()
+    try {
       // Determine which partitions we will accept the new ISR for
-      val adjustedIsrs: Map[TopicPartition, LeaderAndIsr] = 
partitionsToAlter.flatMap {
-        case (tp: TopicPartition, newLeaderAndIsr: LeaderAndIsr) =>
-          controllerContext.partitionLeadershipInfo(tp) match {
-            case Some(leaderIsrAndControllerEpoch) =>
-              val currentLeaderAndIsr = 
leaderIsrAndControllerEpoch.leaderAndIsr
-              if (newLeaderAndIsr.leaderEpoch != 
currentLeaderAndIsr.leaderEpoch) {
-                partitionResponses(tp) = Left(Errors.FENCED_LEADER_EPOCH)
-                None
-              } else if (newLeaderAndIsr.partitionEpoch < 
currentLeaderAndIsr.partitionEpoch) {
-                partitionResponses(tp) = Left(Errors.INVALID_UPDATE_VERSION)
-                None
-              } else if 
(newLeaderAndIsr.equalsIgnorePartitionEpoch(currentLeaderAndIsr)) {
-                // If a partition is already in the desired state, just return 
it
-                partitionResponses(tp) = Right(currentLeaderAndIsr)
-                None
-              } else if (newLeaderAndIsr.leaderRecoveryState == 
LeaderRecoveryState.RECOVERING && newLeaderAndIsr.isr.length > 1) {
-                partitionResponses(tp) = Left(Errors.INVALID_REQUEST)
-                info(
-                  s"Rejecting AlterPartition from node $brokerId for $tp 
because leader is recovering and ISR is greater than 1: " +
-                  s"$newLeaderAndIsr"
-                )
-                None
-              } else if (currentLeaderAndIsr.leaderRecoveryState == 
LeaderRecoveryState.RECOVERED &&
-                newLeaderAndIsr.leaderRecoveryState == 
LeaderRecoveryState.RECOVERING) {
-
-                partitionResponses(tp) = Left(Errors.INVALID_REQUEST)
-                info(
-                  s"Rejecting AlterPartition from node $brokerId for $tp 
because the leader recovery state cannot change from " +
-                  s"RECOVERED to RECOVERING: $newLeaderAndIsr"
-                )
-                None
-              } else {
-                Some(tp -> newLeaderAndIsr)
-              }
-            case None =>
-              partitionResponses(tp) = Left(Errors.UNKNOWN_TOPIC_OR_PARTITION)
+      val adjustedIsrs = partitionsToAlter.flatMap { case (tp, 
newLeaderAndIsr) =>
+        controllerContext.partitionLeadershipInfo(tp) match {
+          case Some(leaderIsrAndControllerEpoch) =>
+            val currentLeaderAndIsr = leaderIsrAndControllerEpoch.leaderAndIsr
+            if (newLeaderAndIsr.leaderEpoch != 
currentLeaderAndIsr.leaderEpoch) {
+              partitionResponses(tp) = Left(Errors.FENCED_LEADER_EPOCH)
               None
-          }
+            } else if (newLeaderAndIsr.partitionEpoch < 
currentLeaderAndIsr.partitionEpoch) {
+              partitionResponses(tp) = Left(Errors.INVALID_UPDATE_VERSION)
+              None
+            } else if 
(newLeaderAndIsr.equalsIgnorePartitionEpoch(currentLeaderAndIsr)) {
+              // If a partition is already in the desired state, just return it
+              partitionResponses(tp) = Right(currentLeaderAndIsr)
+              None
+            } else if (newLeaderAndIsr.leaderRecoveryState == 
LeaderRecoveryState.RECOVERING && newLeaderAndIsr.isr.length > 1) {
+              partitionResponses(tp) = Left(Errors.INVALID_REQUEST)
+              info(
+                s"Rejecting AlterPartition from node $brokerId for $tp because 
leader is recovering and ISR is greater than 1: " +
+                s"$newLeaderAndIsr"
+              )
+              None
+            } else if (currentLeaderAndIsr.leaderRecoveryState == 
LeaderRecoveryState.RECOVERED &&
+              newLeaderAndIsr.leaderRecoveryState == 
LeaderRecoveryState.RECOVERING) {
+
+              partitionResponses(tp) = Left(Errors.INVALID_REQUEST)
+              info(
+                s"Rejecting AlterPartition from node $brokerId for $tp because 
the leader recovery state cannot change from " +
+                s"RECOVERED to RECOVERING: $newLeaderAndIsr"
+              )
+              None
+            } else {
+              Some(tp -> newLeaderAndIsr)
+            }
+
+          case None =>
+            partitionResponses(tp) = Left(Errors.UNKNOWN_TOPIC_OR_PARTITION)
+            None
+        }
       }
 
       // Do the updates in ZK
       debug(s"Updating ISRs for partitions: ${adjustedIsrs.keySet}.")
       val UpdateLeaderAndIsrResult(finishedUpdates, badVersionUpdates) = 
zkClient.updateLeaderAndIsr(
         adjustedIsrs, controllerContext.epoch, 
controllerContext.epochZkVersion)
 
-      val successfulUpdates: Map[TopicPartition, LeaderAndIsr] = 
finishedUpdates.flatMap {
-        case (partition: TopicPartition, isrOrError: Either[Throwable, 
LeaderAndIsr]) =>
-          isrOrError match {
-            case Right(updatedIsr) =>
-              debug(s"ISR for partition $partition updated to 
[${updatedIsr.isr.mkString(",")}] and zkVersion updated to 
[${updatedIsr.partitionEpoch}]")
-              partitionResponses(partition) = Right(updatedIsr)
-              Some(partition -> updatedIsr)
-            case Left(e) =>
-              error(s"Failed to update ISR for partition $partition", e)
-              partitionResponses(partition) = Left(Errors.forException(e))
-              None
-          }
+      val successfulUpdates = finishedUpdates.flatMap { case (partition, 
isrOrError) =>
+        isrOrError match {
+          case Right(updatedIsr) =>
+            debug(s"ISR for partition $partition updated to 
[${updatedIsr.isr.mkString(",")}] and zkVersion updated to 
[${updatedIsr.partitionEpoch}]")

Review Comment:
   nit: maybe we can just log `updatedIsr`?



##########
core/src/main/scala/kafka/server/MetadataCache.scala:
##########
@@ -58,6 +58,10 @@ trait MetadataCache {
 
   def hasAliveBroker(brokerId: Int): Boolean
 
+  def isBrokerFenced(brokerId: Int): Boolean
+
+  def isBrokerInControlledShutdown(brokerId: Int): Boolean

Review Comment:
   nit: how about `isBrokerShuttingDown`?



##########
core/src/test/scala/unit/kafka/cluster/PartitionTest.scala:
##########
@@ -1230,6 +1230,8 @@ class PartitionTest extends AbstractPartitionTest {
     val replicas = List(brokerId, remoteBrokerId)
     val isr = List[Integer](brokerId).asJava
 
+
+

Review Comment:
   nit: extra newlines



##########
core/src/main/scala/kafka/controller/KafkaController.scala:
##########
@@ -2225,194 +2223,228 @@ class KafkaController(val config: KafkaConfig,
     }
   }
 
-  def alterPartitions(alterPartitionRequest: AlterPartitionRequestData, 
callback: AlterPartitionResponseData => Unit): Unit = {
-    val partitionsToAlter = mutable.Map[TopicPartition, LeaderAndIsr]()
-
-    alterPartitionRequest.topics.forEach { topicReq =>
-      topicReq.partitions.forEach { partitionReq =>
-        partitionsToAlter.put(
-          new TopicPartition(topicReq.name, partitionReq.partitionIndex),
-          LeaderAndIsr(
-            alterPartitionRequest.brokerId,
-            partitionReq.leaderEpoch,
-            partitionReq.newIsr().asScala.toList.map(_.toInt),
-            LeaderRecoveryState.of(partitionReq.leaderRecoveryState),
-            partitionReq.partitionEpoch
-          )
-        )
-      }
-    }
+  def alterPartitions(
+    alterPartitionRequest: AlterPartitionRequestData,
+    alterPartitionRequestVersion: Short,
+    callback: AlterPartitionResponseData => Unit
+  ): Unit = {
+    eventManager.put(AlterPartitionReceived(
+      alterPartitionRequest,
+      alterPartitionRequestVersion,
+      callback
+    ))
+  }
 
-    def responseCallback(results: Either[Map[TopicPartition, Either[Errors, 
LeaderAndIsr]], Errors]): Unit = {
-      val resp = new AlterPartitionResponseData()
-      results match {
-        case Right(error) =>
-          resp.setErrorCode(error.code)
-        case Left(partitionResults) =>
-          resp.setTopics(new util.ArrayList())
-          partitionResults
-            .groupBy { case (tp, _) => tp.topic }   // Group by topic
-            .foreach { case (topic, partitions) =>
-              // Add each topic part to the response
-              val topicResp = new AlterPartitionResponseData.TopicData()
-                .setName(topic)
-                .setPartitions(new util.ArrayList())
-              resp.topics.add(topicResp)
-              partitions.foreach { case (tp, errorOrIsr) =>
-                // Add each partition part to the response (new ISR or error)
-                errorOrIsr match {
-                  case Left(error) => topicResp.partitions.add(
-                    new AlterPartitionResponseData.PartitionData()
-                      .setPartitionIndex(tp.partition)
-                      .setErrorCode(error.code))
-                  case Right(leaderAndIsr) =>
-                    /* Setting the LeaderRecoveryState field is always safe 
because it will always be the same
-                     * as the value set in the request. For version 0, that is 
always the default RECOVERED
-                     * which is ignored when serializing to version 0. For any 
other version, the
-                     * LeaderRecoveryState field is supported.
-                     */
-                    topicResp.partitions.add(
-                      new AlterPartitionResponseData.PartitionData()
-                        .setPartitionIndex(tp.partition)
-                        .setLeaderId(leaderAndIsr.leader)
-                        .setLeaderEpoch(leaderAndIsr.leaderEpoch)
-                        .setIsr(leaderAndIsr.isr.map(Integer.valueOf).asJava)
-                        
.setLeaderRecoveryState(leaderAndIsr.leaderRecoveryState.value)
-                        .setPartitionEpoch(leaderAndIsr.partitionEpoch)
-                    )
-                }
-            }
-          }
-      }
-      callback.apply(resp)
+  private def processAlterPartition(
+    alterPartitionRequest: AlterPartitionRequestData,
+    alterPartitionRequestVersion: Short,
+    callback: AlterPartitionResponseData => Unit
+  ): Unit = {
+    try {
+      doProcessAlterPartition(
+        alterPartitionRequest,
+        alterPartitionRequestVersion,
+        callback
+      )
+    } catch {
+      case e: Throwable =>
+        error(s"Error when processing AlterPartition: $alterPartitionRequest", 
e)
+        callback(new 
AlterPartitionResponseData().setErrorCode(Errors.UNKNOWN_SERVER_ERROR.code))
     }
-
-    eventManager.put(
-      AlterPartitionReceived(alterPartitionRequest.brokerId, 
alterPartitionRequest.brokerEpoch, partitionsToAlter, responseCallback)
-    )
   }
 
-  private def processAlterPartition(
-    brokerId: Int,
-    brokerEpoch: Long,
-    partitionsToAlter: Map[TopicPartition, LeaderAndIsr],
-    callback: AlterPartitionCallback
+  private def doProcessAlterPartition(
+    alterPartitionRequest: AlterPartitionRequestData,
+    alterPartitionRequestVersion: Short,
+    callback: AlterPartitionResponseData => Unit
   ): Unit = {
+    val useTopicsIds = alterPartitionRequestVersion > 1
 
     // Handle a few short-circuits
     if (!isActive) {
-      callback.apply(Right(Errors.NOT_CONTROLLER))
+      callback(new 
AlterPartitionResponseData().setErrorCode(Errors.NOT_CONTROLLER.code))
       return
     }
 
+    val brokerId = alterPartitionRequest.brokerId
+    val brokerEpoch = alterPartitionRequest.brokerEpoch
     val brokerEpochOpt = controllerContext.liveBrokerIdAndEpochs.get(brokerId)
     if (brokerEpochOpt.isEmpty) {
       info(s"Ignoring AlterPartition due to unknown broker $brokerId")
-      callback.apply(Right(Errors.STALE_BROKER_EPOCH))
+      callback(new 
AlterPartitionResponseData().setErrorCode(Errors.STALE_BROKER_EPOCH.code))
       return
     }
 
     if (!brokerEpochOpt.contains(brokerEpoch)) {
       info(s"Ignoring AlterPartition due to stale broker epoch $brokerEpoch 
and local broker epoch $brokerEpochOpt for broker $brokerId")
-      callback.apply(Right(Errors.STALE_BROKER_EPOCH))
+      callback(new 
AlterPartitionResponseData().setErrorCode(Errors.STALE_BROKER_EPOCH.code))
       return
     }
 
-    val response = try {
-      val partitionResponses = mutable.HashMap[TopicPartition, Either[Errors, 
LeaderAndIsr]]()
+    val partitionsToAlter = new mutable.HashMap[TopicPartition, LeaderAndIsr]()
+    val alterPartitionResponse = new AlterPartitionResponseData()
 
+    alterPartitionRequest.topics.forEach { topicReq =>
+      val topicNameOpt = if (useTopicsIds) {
+        controllerContext.topicName(topicReq.topicId)
+      } else {
+        Some(topicReq.topicName)
+      }
+
+      topicNameOpt match {
+        case None =>
+          val topicResponse = new AlterPartitionResponseData.TopicData()
+            .setTopicId(topicReq.topicId)
+          alterPartitionResponse.topics.add(topicResponse)
+          topicReq.partitions.forEach { partitionReq =>
+            topicResponse.partitions.add(new 
AlterPartitionResponseData.PartitionData()
+              .setPartitionIndex(partitionReq.partitionIndex)
+              .setErrorCode(Errors.UNKNOWN_TOPIC_ID.code))
+          }
+
+        case Some(topicName) =>
+          topicReq.partitions.forEach { partitionReq =>
+            partitionsToAlter.put(
+              new TopicPartition(topicName, partitionReq.partitionIndex),
+              LeaderAndIsr(
+                alterPartitionRequest.brokerId,
+                partitionReq.leaderEpoch,
+                partitionReq.newIsr.asScala.toList.map(_.toInt),
+                LeaderRecoveryState.of(partitionReq.leaderRecoveryState),
+                partitionReq.partitionEpoch
+              )
+            )
+          }
+      }
+    }
+
+    val partitionResponses = mutable.HashMap[TopicPartition, Either[Errors, 
LeaderAndIsr]]()
+    try {
       // Determine which partitions we will accept the new ISR for
-      val adjustedIsrs: Map[TopicPartition, LeaderAndIsr] = 
partitionsToAlter.flatMap {
-        case (tp: TopicPartition, newLeaderAndIsr: LeaderAndIsr) =>
-          controllerContext.partitionLeadershipInfo(tp) match {
-            case Some(leaderIsrAndControllerEpoch) =>
-              val currentLeaderAndIsr = 
leaderIsrAndControllerEpoch.leaderAndIsr
-              if (newLeaderAndIsr.leaderEpoch != 
currentLeaderAndIsr.leaderEpoch) {
-                partitionResponses(tp) = Left(Errors.FENCED_LEADER_EPOCH)
-                None
-              } else if (newLeaderAndIsr.partitionEpoch < 
currentLeaderAndIsr.partitionEpoch) {
-                partitionResponses(tp) = Left(Errors.INVALID_UPDATE_VERSION)
-                None
-              } else if 
(newLeaderAndIsr.equalsIgnorePartitionEpoch(currentLeaderAndIsr)) {
-                // If a partition is already in the desired state, just return 
it
-                partitionResponses(tp) = Right(currentLeaderAndIsr)
-                None
-              } else if (newLeaderAndIsr.leaderRecoveryState == 
LeaderRecoveryState.RECOVERING && newLeaderAndIsr.isr.length > 1) {
-                partitionResponses(tp) = Left(Errors.INVALID_REQUEST)
-                info(
-                  s"Rejecting AlterPartition from node $brokerId for $tp 
because leader is recovering and ISR is greater than 1: " +
-                  s"$newLeaderAndIsr"
-                )
-                None
-              } else if (currentLeaderAndIsr.leaderRecoveryState == 
LeaderRecoveryState.RECOVERED &&
-                newLeaderAndIsr.leaderRecoveryState == 
LeaderRecoveryState.RECOVERING) {
-
-                partitionResponses(tp) = Left(Errors.INVALID_REQUEST)
-                info(
-                  s"Rejecting AlterPartition from node $brokerId for $tp 
because the leader recovery state cannot change from " +
-                  s"RECOVERED to RECOVERING: $newLeaderAndIsr"
-                )
-                None
-              } else {
-                Some(tp -> newLeaderAndIsr)
-              }
-            case None =>
-              partitionResponses(tp) = Left(Errors.UNKNOWN_TOPIC_OR_PARTITION)
+      val adjustedIsrs = partitionsToAlter.flatMap { case (tp, 
newLeaderAndIsr) =>
+        controllerContext.partitionLeadershipInfo(tp) match {
+          case Some(leaderIsrAndControllerEpoch) =>
+            val currentLeaderAndIsr = leaderIsrAndControllerEpoch.leaderAndIsr
+            if (newLeaderAndIsr.leaderEpoch != 
currentLeaderAndIsr.leaderEpoch) {
+              partitionResponses(tp) = Left(Errors.FENCED_LEADER_EPOCH)
               None
-          }
+            } else if (newLeaderAndIsr.partitionEpoch < 
currentLeaderAndIsr.partitionEpoch) {
+              partitionResponses(tp) = Left(Errors.INVALID_UPDATE_VERSION)
+              None
+            } else if 
(newLeaderAndIsr.equalsIgnorePartitionEpoch(currentLeaderAndIsr)) {
+              // If a partition is already in the desired state, just return it
+              partitionResponses(tp) = Right(currentLeaderAndIsr)
+              None
+            } else if (newLeaderAndIsr.leaderRecoveryState == 
LeaderRecoveryState.RECOVERING && newLeaderAndIsr.isr.length > 1) {
+              partitionResponses(tp) = Left(Errors.INVALID_REQUEST)
+              info(
+                s"Rejecting AlterPartition from node $brokerId for $tp because 
leader is recovering and ISR is greater than 1: " +
+                s"$newLeaderAndIsr"
+              )
+              None
+            } else if (currentLeaderAndIsr.leaderRecoveryState == 
LeaderRecoveryState.RECOVERED &&
+              newLeaderAndIsr.leaderRecoveryState == 
LeaderRecoveryState.RECOVERING) {
+
+              partitionResponses(tp) = Left(Errors.INVALID_REQUEST)
+              info(
+                s"Rejecting AlterPartition from node $brokerId for $tp because 
the leader recovery state cannot change from " +
+                s"RECOVERED to RECOVERING: $newLeaderAndIsr"
+              )
+              None
+            } else {
+              Some(tp -> newLeaderAndIsr)
+            }
+
+          case None =>
+            partitionResponses(tp) = Left(Errors.UNKNOWN_TOPIC_OR_PARTITION)
+            None
+        }
       }
 
       // Do the updates in ZK
       debug(s"Updating ISRs for partitions: ${adjustedIsrs.keySet}.")
       val UpdateLeaderAndIsrResult(finishedUpdates, badVersionUpdates) = 
zkClient.updateLeaderAndIsr(
         adjustedIsrs, controllerContext.epoch, 
controllerContext.epochZkVersion)
 
-      val successfulUpdates: Map[TopicPartition, LeaderAndIsr] = 
finishedUpdates.flatMap {
-        case (partition: TopicPartition, isrOrError: Either[Throwable, 
LeaderAndIsr]) =>
-          isrOrError match {
-            case Right(updatedIsr) =>
-              debug(s"ISR for partition $partition updated to 
[${updatedIsr.isr.mkString(",")}] and zkVersion updated to 
[${updatedIsr.partitionEpoch}]")
-              partitionResponses(partition) = Right(updatedIsr)
-              Some(partition -> updatedIsr)
-            case Left(e) =>
-              error(s"Failed to update ISR for partition $partition", e)
-              partitionResponses(partition) = Left(Errors.forException(e))
-              None
-          }
+      val successfulUpdates = finishedUpdates.flatMap { case (partition, 
isrOrError) =>
+        isrOrError match {
+          case Right(updatedIsr) =>
+            debug(s"ISR for partition $partition updated to 
[${updatedIsr.isr.mkString(",")}] and zkVersion updated to 
[${updatedIsr.partitionEpoch}]")
+            partitionResponses(partition) = Right(updatedIsr)
+            Some(partition -> updatedIsr)
+          case Left(e) =>
+            error(s"Failed to update ISR for partition $partition", e)
+            partitionResponses(partition) = Left(Errors.forException(e))
+            None
+        }
       }
 
       badVersionUpdates.foreach { partition =>
         info(s"Failed to update ISR to ${adjustedIsrs(partition)} for 
partition $partition, bad ZK version.")
         partitionResponses(partition) = Left(Errors.INVALID_UPDATE_VERSION)
       }
 
-      def processUpdateNotifications(partitions: Seq[TopicPartition]): Unit = {
-        val liveBrokers: Seq[Int] = 
controllerContext.liveOrShuttingDownBrokerIds.toSeq
-        sendUpdateMetadataRequest(liveBrokers, partitions.toSet)
-      }
-
       // Update our cache and send out metadata updates
       updateLeaderAndIsrCache(successfulUpdates.keys.toSeq)
-      processUpdateNotifications(partitionsToAlter.keys.toSeq)
+      sendUpdateMetadataRequest(
+        controllerContext.liveOrShuttingDownBrokerIds.toSeq,
+        partitionsToAlter.keySet
+      )
+
+      partitionResponses.groupBy(_._1.topic).forKeyValue { (topicName, 
partitionResponses) =>
+        // Add each topic part to the response
+        val topicResponse = if (useTopicsIds) {
+          new AlterPartitionResponseData.TopicData()
+            .setTopicId(controllerContext.topicIds.getOrElse(topicName, 
Uuid.ZERO_UUID))
+        } else {
+          new AlterPartitionResponseData.TopicData()
+            .setTopicName(topicName)
+        }
+        alterPartitionResponse.topics.add(topicResponse)
+
+        partitionResponses.forKeyValue { (tp, errorOrIsr) =>
+          // Add each partition part to the response (new ISR or error)
+          errorOrIsr match {
+            case Left(error) =>
+              topicResponse.partitions.add(
+                new AlterPartitionResponseData.PartitionData()
+                  .setPartitionIndex(tp.partition)
+                  .setErrorCode(error.code))
+            case Right(leaderAndIsr) =>
+              /* Setting the LeaderRecoveryState field is always safe because 
it will always be the same
+               * as the value set in the request. For version 0, that is 
always the default RECOVERED
+               * which is ignored when serializing to version 0. For any other 
version, the
+               * LeaderRecoveryState field is supported.
+               */
+              topicResponse.partitions.add(
+                new AlterPartitionResponseData.PartitionData()
+                  .setPartitionIndex(tp.partition)
+                  .setLeaderId(leaderAndIsr.leader)
+                  .setLeaderEpoch(leaderAndIsr.leaderEpoch)
+                  .setIsr(leaderAndIsr.isr.map(Integer.valueOf).asJava)
+                  
.setLeaderRecoveryState(leaderAndIsr.leaderRecoveryState.value)
+                  .setPartitionEpoch(leaderAndIsr.partitionEpoch)
+              )
+          }
+        }
+      }
 
-      Left(partitionResponses)
+      callback(alterPartitionResponse)
     } catch {
       case e: Throwable =>
         error(s"Error when processing AlterPartition for partitions: 
${partitionsToAlter.keys.toSeq}", e)
-        Right(Errors.UNKNOWN_SERVER_ERROR)
+        callback(new 
AlterPartitionResponseData().setErrorCode(Errors.UNKNOWN_SERVER_ERROR.code))

Review Comment:
   Could we let this throw to `processAlterPartition`?



##########
metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java:
##########
@@ -857,26 +869,68 @@ public void testShrinkAndExpandIsr() throws Exception {
             new int[][] {new int[] {0, 1, 2}});
 
         TopicIdPartition topicIdPartition = new 
TopicIdPartition(createTopicResult.topicId(), 0);
-        TopicPartition topicPartition = new TopicPartition("foo", 0);
         assertEquals(OptionalInt.of(0), ctx.currentLeader(topicIdPartition));
         long brokerEpoch = ctx.currentBrokerEpoch(0);
         PartitionData shrinkIsrRequest = newAlterPartition(
             replicationControl, topicIdPartition, asList(0, 1), 
LeaderRecoveryState.RECOVERED);
-        ControllerResult<AlterPartitionResponseData> shrinkIsrResult = 
sendAlterIsr(
-            replicationControl, 0, brokerEpoch, "foo", shrinkIsrRequest);
+        ControllerResult<AlterPartitionResponseData> shrinkIsrResult = 
sendAlterPartition(
+            replicationControl, 0, brokerEpoch, topicIdPartition.topicId(), 
shrinkIsrRequest);
         AlterPartitionResponseData.PartitionData shrinkIsrResponse = 
assertAlterPartitionResponse(
-            shrinkIsrResult, topicPartition, NONE);
+            shrinkIsrResult, topicIdPartition, NONE);
         assertConsistentAlterPartitionResponse(replicationControl, 
topicIdPartition, shrinkIsrResponse);
 
         PartitionData expandIsrRequest = newAlterPartition(
             replicationControl, topicIdPartition, asList(0, 1, 2), 
LeaderRecoveryState.RECOVERED);
-        ControllerResult<AlterPartitionResponseData> expandIsrResult = 
sendAlterIsr(
-            replicationControl, 0, brokerEpoch, "foo", expandIsrRequest);
+        ControllerResult<AlterPartitionResponseData> expandIsrResult = 
sendAlterPartition(
+            replicationControl, 0, brokerEpoch, topicIdPartition.topicId(), 
expandIsrRequest);
         AlterPartitionResponseData.PartitionData expandIsrResponse = 
assertAlterPartitionResponse(
-            expandIsrResult, topicPartition, NONE);
+            expandIsrResult, topicIdPartition, NONE);
         assertConsistentAlterPartitionResponse(replicationControl, 
topicIdPartition, expandIsrResponse);
     }
 
+    @ParameterizedTest
+    @ApiKeyVersionsSource(apiKey = ApiKeys.ALTER_PARTITION)
+    public void testAlterPartitionHandleUnknownTopicIdOrName(short version) 
throws Exception {

Review Comment:
   Might also be useful to cover the case where the request TopicId does not 
match the one the controller has.



##########
core/src/main/scala/kafka/cluster/Partition.scala:
##########
@@ -1588,6 +1649,14 @@ class Partition(val topicPartition: TopicPartition,
       case Errors.INVALID_REQUEST =>
         debug(s"Failed to alter partition to $proposedIsrState because the 
request is invalid. Giving up.")
         false
+      case Errors.NEW_LEADER_ELECTED =>
+        // The operation completed successfully but this replica got removed 
from the replica set by the controller
+        // while completing a ongoing reassignment. This replica is no longer 
the leader but it does not know it
+        // yet. It should remain in the current pending state until the 
metadata overrides it.
+        // This is only raised in KRaft mode.
+        debug("The alter partition request successfully updated the partition 
state but this replica got " +

Review Comment:
   Would it be worthwhile to include the proposed partition state in this 
message?



##########
core/src/main/scala/kafka/cluster/Partition.scala:
##########
@@ -1571,14 +1620,26 @@ class Partition(val topicPartition: TopicPartition,
     error match {
       case Errors.OPERATION_NOT_ATTEMPTED =>
         // Since the operation was not attempted, it is safe to reset back to 
the committed state.
-        partitionState = CommittedPartitionState(proposedIsrState.isr, 
LeaderRecoveryState.RECOVERED)
+        partitionState = proposedIsrState.lastCommittedState

Review Comment:
   I have been trying to think through the consequence of sending 
`AlterPartition` to a stale controller. We are trusting the returned error code 
to be a statement not just of the controller handling the request, but of the 
metadata log itself. What would happen if our request were successfully 
processed by the current controller, but we lost the response? Is it possible 
for the request to be retried on an old controller? I think the protection from 
the raft layer is sufficient to rule this out. It ensures that we can only find 
new controllers with a larger epoch. Is that good enough? It might be worth 
adding some comments here to convince ourselves that it is indeed safe to 
revert back to the last committed state.



##########
core/src/test/scala/unit/kafka/cluster/PartitionTest.scala:
##########
@@ -1332,6 +1334,166 @@ class PartitionTest extends AbstractPartitionTest {
     assertEquals(alterPartitionListener.failures.get, 1)
   }
 
+  @Test
+  def testIsrNotExpandedIfReplicaIsFenced(): Unit = {
+    val log = logManager.getOrCreateLog(topicPartition, topicId = None)
+    seedLogData(log, numRecords = 10, leaderEpoch = 4)
+
+    val controllerEpoch = 0
+    val leaderEpoch = 5
+    val remoteBrokerId = brokerId + 1
+    val replicas = List(brokerId, remoteBrokerId)
+    val isr = Set(brokerId)
+
+    partition.createLogIfNotExists(isNew = false, isFutureReplica = false, 
offsetCheckpoints, None)
+    assertTrue(partition.makeLeader(
+      new LeaderAndIsrPartitionState()
+        .setControllerEpoch(controllerEpoch)
+        .setLeader(brokerId)
+        .setLeaderEpoch(leaderEpoch)
+        .setIsr(isr.toList.map(Int.box).asJava)
+        .setPartitionEpoch(1)
+        .setReplicas(replicas.map(Int.box).asJava)
+        .setIsNew(true),
+      offsetCheckpoints, None), "Expected become leader transition to succeed")
+    assertEquals(isr, partition.partitionState.isr)
+    assertEquals(isr, partition.partitionState.maximalIsr)
+
+    // Fetch to let the follower catches up to the log end offset and

Review Comment:
   nit: catches -> catch (same in test below)



##########
core/src/test/scala/unit/kafka/cluster/PartitionTest.scala:
##########
@@ -1332,6 +1334,166 @@ class PartitionTest extends AbstractPartitionTest {
     assertEquals(alterPartitionListener.failures.get, 1)
   }
 
+  @Test
+  def testIsrNotExpandedIfReplicaIsFenced(): Unit = {
+    val log = logManager.getOrCreateLog(topicPartition, topicId = None)
+    seedLogData(log, numRecords = 10, leaderEpoch = 4)
+
+    val controllerEpoch = 0
+    val leaderEpoch = 5
+    val remoteBrokerId = brokerId + 1
+    val replicas = List(brokerId, remoteBrokerId)
+    val isr = Set(brokerId)
+
+    partition.createLogIfNotExists(isNew = false, isFutureReplica = false, 
offsetCheckpoints, None)
+    assertTrue(partition.makeLeader(
+      new LeaderAndIsrPartitionState()
+        .setControllerEpoch(controllerEpoch)
+        .setLeader(brokerId)
+        .setLeaderEpoch(leaderEpoch)
+        .setIsr(isr.toList.map(Int.box).asJava)
+        .setPartitionEpoch(1)
+        .setReplicas(replicas.map(Int.box).asJava)
+        .setIsNew(true),
+      offsetCheckpoints, None), "Expected become leader transition to succeed")
+    assertEquals(isr, partition.partitionState.isr)
+    assertEquals(isr, partition.partitionState.maximalIsr)
+
+    // Fetch to let the follower catches up to the log end offset and
+    // to check if an expansion is possible.
+    fetchFollower(partition, replicaId = remoteBrokerId, fetchOffset = 
log.logEndOffset)
+
+    // Follower fetches and catches up to the log end offset.
+    assertReplicaState(partition, remoteBrokerId,
+      lastCaughtUpTimeMs = time.milliseconds(),
+      logStartOffset = 0L,
+      logEndOffset = log.logEndOffset
+    )
+
+    // Expansion is triggered.
+    assertEquals(isr, partition.partitionState.isr)
+    assertEquals(replicas.toSet, partition.partitionState.maximalIsr)
+    assertEquals(1, alterPartitionManager.isrUpdates.size)
+
+    // Controller rejects the expansion because the broker is fenced.
+    alterPartitionManager.failIsrUpdate(Errors.INELIGIBLE_REPLICA)
+
+    // The leader reverts back to the previous ISR.
+    assertEquals(isr, partition.partitionState.isr)

Review Comment:
   Maybe we can assert `!partitionState.isInflight`?



##########
core/src/test/scala/unit/kafka/cluster/PartitionTest.scala:
##########
@@ -1332,6 +1334,166 @@ class PartitionTest extends AbstractPartitionTest {
     assertEquals(alterPartitionListener.failures.get, 1)
   }
 
+  @Test
+  def testIsrNotExpandedIfReplicaIsFenced(): Unit = {
+    val log = logManager.getOrCreateLog(topicPartition, topicId = None)
+    seedLogData(log, numRecords = 10, leaderEpoch = 4)
+
+    val controllerEpoch = 0
+    val leaderEpoch = 5
+    val remoteBrokerId = brokerId + 1
+    val replicas = List(brokerId, remoteBrokerId)
+    val isr = Set(brokerId)
+
+    partition.createLogIfNotExists(isNew = false, isFutureReplica = false, 
offsetCheckpoints, None)
+    assertTrue(partition.makeLeader(
+      new LeaderAndIsrPartitionState()
+        .setControllerEpoch(controllerEpoch)
+        .setLeader(brokerId)
+        .setLeaderEpoch(leaderEpoch)
+        .setIsr(isr.toList.map(Int.box).asJava)
+        .setPartitionEpoch(1)
+        .setReplicas(replicas.map(Int.box).asJava)
+        .setIsNew(true),
+      offsetCheckpoints, None), "Expected become leader transition to succeed")
+    assertEquals(isr, partition.partitionState.isr)
+    assertEquals(isr, partition.partitionState.maximalIsr)
+
+    // Fetch to let the follower catches up to the log end offset and
+    // to check if an expansion is possible.
+    fetchFollower(partition, replicaId = remoteBrokerId, fetchOffset = 
log.logEndOffset)
+
+    // Follower fetches and catches up to the log end offset.
+    assertReplicaState(partition, remoteBrokerId,
+      lastCaughtUpTimeMs = time.milliseconds(),
+      logStartOffset = 0L,
+      logEndOffset = log.logEndOffset
+    )
+
+    // Expansion is triggered.
+    assertEquals(isr, partition.partitionState.isr)
+    assertEquals(replicas.toSet, partition.partitionState.maximalIsr)
+    assertEquals(1, alterPartitionManager.isrUpdates.size)
+
+    // Controller rejects the expansion because the broker is fenced.
+    alterPartitionManager.failIsrUpdate(Errors.INELIGIBLE_REPLICA)
+
+    // The leader reverts back to the previous ISR.
+    assertEquals(isr, partition.partitionState.isr)
+    assertEquals(isr, partition.partitionState.maximalIsr)
+    assertEquals(0, alterPartitionManager.isrUpdates.size)
+
+    // The leader eventually learns about the fenced broker.
+    when(metadataCache.isBrokerFenced(remoteBrokerId)).thenReturn(true)
+
+    // The follower fetches again.
+    fetchFollower(partition, replicaId = remoteBrokerId, fetchOffset = 
log.logEndOffset)
+
+    // Expansion is not triggered because the follower is fenced.
+    assertEquals(isr, partition.partitionState.isr)
+    assertEquals(isr, partition.partitionState.maximalIsr)
+    assertEquals(0, alterPartitionManager.isrUpdates.size)
+
+    // The broker is eventually unfenced.
+    when(metadataCache.isBrokerFenced(remoteBrokerId)).thenReturn(false)
+
+    // The follower fetches again.
+    fetchFollower(partition, replicaId = remoteBrokerId, fetchOffset = 
log.logEndOffset)
+
+    // Expansion is triggered.
+    assertEquals(isr, partition.partitionState.isr)
+    assertEquals(replicas.toSet, partition.partitionState.maximalIsr)
+    assertEquals(1, alterPartitionManager.isrUpdates.size)
+
+    // Expansion succeeds.
+    alterPartitionManager.completeIsrUpdate(1)
+
+    // ISR is committed.
+    assertEquals(replicas.toSet, partition.partitionState.isr)
+    assertEquals(replicas.toSet, partition.partitionState.maximalIsr)
+    assertEquals(0, alterPartitionManager.isrUpdates.size)
+  }
+
+  @Test
+  def testIsrNotExpandedIfReplicaIsInControlledShutdown(): Unit = {
+    val log = logManager.getOrCreateLog(topicPartition, topicId = None)
+    seedLogData(log, numRecords = 10, leaderEpoch = 4)
+
+    val controllerEpoch = 0
+    val leaderEpoch = 5
+    val remoteBrokerId = brokerId + 1
+    val replicas = List(brokerId, remoteBrokerId)
+    val isr = Set(brokerId)
+
+    partition.createLogIfNotExists(isNew = false, isFutureReplica = false, 
offsetCheckpoints, None)
+    assertTrue(partition.makeLeader(
+      new LeaderAndIsrPartitionState()
+        .setControllerEpoch(controllerEpoch)
+        .setLeader(brokerId)
+        .setLeaderEpoch(leaderEpoch)
+        .setIsr(isr.toList.map(Int.box).asJava)
+        .setPartitionEpoch(1)
+        .setReplicas(replicas.map(Int.box).asJava)
+        .setIsNew(true),
+      offsetCheckpoints, None), "Expected become leader transition to succeed")
+    assertEquals(isr, partition.partitionState.isr)
+    assertEquals(isr, partition.partitionState.maximalIsr)
+
+    // Fetch to let the follower catches up to the log end offset and
+    // to check if an expansion is possible.
+    fetchFollower(partition, replicaId = remoteBrokerId, fetchOffset = 
log.logEndOffset)
+
+    // Follower fetches and catches up to the log end offset.
+    assertReplicaState(partition, remoteBrokerId,
+      lastCaughtUpTimeMs = time.milliseconds(),
+      logStartOffset = 0L,
+      logEndOffset = log.logEndOffset
+    )
+
+    // Expansion is triggered.
+    assertEquals(isr, partition.partitionState.isr)
+    assertEquals(replicas.toSet, partition.partitionState.maximalIsr)
+    assertEquals(1, alterPartitionManager.isrUpdates.size)
+
+    // Controller rejects the expansion because the broker is in controlled 
shutdown.
+    alterPartitionManager.failIsrUpdate(Errors.INELIGIBLE_REPLICA)
+
+    // The leader reverts back to the previous ISR.
+    assertEquals(isr, partition.partitionState.isr)
+    assertEquals(isr, partition.partitionState.maximalIsr)
+    assertEquals(0, alterPartitionManager.isrUpdates.size)
+
+    // The leader eventually learns about the in controlled shutdown broker.
+    
when(metadataCache.isBrokerInControlledShutdown(remoteBrokerId)).thenReturn(true)
+
+    // The follower fetches again.
+    fetchFollower(partition, replicaId = remoteBrokerId, fetchOffset = 
log.logEndOffset)
+
+    // Expansion is not triggered because the follower is fenced.
+    assertEquals(isr, partition.partitionState.isr)
+    assertEquals(isr, partition.partitionState.maximalIsr)
+    assertEquals(0, alterPartitionManager.isrUpdates.size)
+
+    // The broker is eventually comes back.

Review Comment:
   nit: drop "is"



##########
core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala:
##########
@@ -855,29 +856,49 @@ class ControllerIntegrationTest extends QuorumTestHarness 
{
     val assignment = Map(tp.partition -> Seq(otherBroker.config.brokerId, 
controllerId))
     TestUtils.createTopic(zkClient, tp.topic, partitionReplicaAssignment = 
assignment, servers = servers)
 
-    val latch = new CountDownLatch(1)
     val controller = getController().kafkaController
-
     val leaderIsrAndControllerEpochMap = 
zkClient.getTopicPartitionStates(Seq(tp))
     val newLeaderAndIsr = leaderIsrAndControllerEpochMap(tp).leaderAndIsr
+    val topicId = controller.controllerContext.topicIds.getOrElse(tp.topic, 
Uuid.ZERO_UUID)

Review Comment:
   Do we have both cases covered?



##########
core/src/test/scala/unit/kafka/cluster/PartitionTest.scala:
##########
@@ -1332,6 +1334,166 @@ class PartitionTest extends AbstractPartitionTest {
     assertEquals(alterPartitionListener.failures.get, 1)
   }
 
+  @Test
+  def testIsrNotExpandedIfReplicaIsFenced(): Unit = {
+    val log = logManager.getOrCreateLog(topicPartition, topicId = None)
+    seedLogData(log, numRecords = 10, leaderEpoch = 4)
+
+    val controllerEpoch = 0
+    val leaderEpoch = 5
+    val remoteBrokerId = brokerId + 1
+    val replicas = List(brokerId, remoteBrokerId)
+    val isr = Set(brokerId)
+
+    partition.createLogIfNotExists(isNew = false, isFutureReplica = false, 
offsetCheckpoints, None)
+    assertTrue(partition.makeLeader(
+      new LeaderAndIsrPartitionState()
+        .setControllerEpoch(controllerEpoch)
+        .setLeader(brokerId)
+        .setLeaderEpoch(leaderEpoch)
+        .setIsr(isr.toList.map(Int.box).asJava)
+        .setPartitionEpoch(1)
+        .setReplicas(replicas.map(Int.box).asJava)
+        .setIsNew(true),
+      offsetCheckpoints, None), "Expected become leader transition to succeed")
+    assertEquals(isr, partition.partitionState.isr)
+    assertEquals(isr, partition.partitionState.maximalIsr)
+
+    // Fetch to let the follower catches up to the log end offset and
+    // to check if an expansion is possible.
+    fetchFollower(partition, replicaId = remoteBrokerId, fetchOffset = 
log.logEndOffset)
+
+    // Follower fetches and catches up to the log end offset.
+    assertReplicaState(partition, remoteBrokerId,
+      lastCaughtUpTimeMs = time.milliseconds(),
+      logStartOffset = 0L,
+      logEndOffset = log.logEndOffset
+    )
+
+    // Expansion is triggered.
+    assertEquals(isr, partition.partitionState.isr)
+    assertEquals(replicas.toSet, partition.partitionState.maximalIsr)
+    assertEquals(1, alterPartitionManager.isrUpdates.size)
+
+    // Controller rejects the expansion because the broker is fenced.
+    alterPartitionManager.failIsrUpdate(Errors.INELIGIBLE_REPLICA)
+
+    // The leader reverts back to the previous ISR.
+    assertEquals(isr, partition.partitionState.isr)
+    assertEquals(isr, partition.partitionState.maximalIsr)
+    assertEquals(0, alterPartitionManager.isrUpdates.size)
+
+    // The leader eventually learns about the fenced broker.
+    when(metadataCache.isBrokerFenced(remoteBrokerId)).thenReturn(true)
+
+    // The follower fetches again.
+    fetchFollower(partition, replicaId = remoteBrokerId, fetchOffset = 
log.logEndOffset)
+
+    // Expansion is not triggered because the follower is fenced.
+    assertEquals(isr, partition.partitionState.isr)
+    assertEquals(isr, partition.partitionState.maximalIsr)
+    assertEquals(0, alterPartitionManager.isrUpdates.size)
+
+    // The broker is eventually unfenced.
+    when(metadataCache.isBrokerFenced(remoteBrokerId)).thenReturn(false)
+
+    // The follower fetches again.
+    fetchFollower(partition, replicaId = remoteBrokerId, fetchOffset = 
log.logEndOffset)
+
+    // Expansion is triggered.
+    assertEquals(isr, partition.partitionState.isr)
+    assertEquals(replicas.toSet, partition.partitionState.maximalIsr)
+    assertEquals(1, alterPartitionManager.isrUpdates.size)
+
+    // Expansion succeeds.
+    alterPartitionManager.completeIsrUpdate(1)

Review Comment:
   nit: can we name the parameter `newPartitionEpoch=1`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to