hachikuji commented on code in PR #12181:
URL: https://github.com/apache/kafka/pull/12181#discussion_r891635253


##########
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java:
##########
@@ -915,15 +918,21 @@ Set<TopicIdPartition> imbalancedPartitions() {
         return new HashSet<>(imbalancedPartitions);
     }
 
-    ControllerResult<AlterPartitionResponseData> 
alterPartition(AlterPartitionRequestData request) {
+    ControllerResult<AlterPartitionResponseData> alterPartition(
+        ControllerRequestContext context,
+        AlterPartitionRequestData request
+    ) {
         clusterControl.checkBrokerEpoch(request.brokerId(), 
request.brokerEpoch());
         AlterPartitionResponseData response = new AlterPartitionResponseData();
         List<ApiMessageAndVersion> records = new ArrayList<>();
         for (AlterPartitionRequestData.TopicData topicData : request.topics()) 
{
             AlterPartitionResponseData.TopicData responseTopicData =
-                new 
AlterPartitionResponseData.TopicData().setName(topicData.name());
+                new AlterPartitionResponseData.TopicData().
+                    setTopicName(topicData.topicName()).
+                    setTopicId(topicData.topicId());
             response.topics().add(responseTopicData);
-            Uuid topicId = topicsByName.get(topicData.name());
+            Uuid topicId = topicData.topicId().equals(Uuid.ZERO_UUID) ?
+                topicsByName.get(topicData.topicName()) : topicData.topicId();

Review Comment:
   This wasn't covered in the KIP, but when we cannot find the provided 
`TopicId`, should we return `UNKNOWN_TOPIC_ID`?



##########
clients/src/main/java/org/apache/kafka/common/errors/IneligibleReplica.java:
##########
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.errors;
+
+public class IneligibleReplica extends ApiException {

Review Comment:
   nit: I think the usual convention is to add the "Exception" suffix



##########
core/src/main/scala/kafka/cluster/Partition.scala:
##########
@@ -167,7 +168,8 @@ sealed trait PendingPartitionChange extends PartitionState {
 case class PendingExpandIsr(
   isr: Set[Int],
   newInSyncReplicaId: Int,
-  sentLeaderAndIsr: LeaderAndIsr
+  sentLeaderAndIsr: LeaderAndIsr,
+  partitionStateToRollBackTo: PartitionState

Review Comment:
   nit: Wonder if we could use a more concise name. Maybe `priorState` or 
`lastCommittedState`?



##########
core/src/main/scala/kafka/controller/KafkaController.scala:
##########
@@ -2225,194 +2223,210 @@ class KafkaController(val config: KafkaConfig,
     }
   }
 
-  def alterPartitions(alterPartitionRequest: AlterPartitionRequestData, 
callback: AlterPartitionResponseData => Unit): Unit = {
-    val partitionsToAlter = mutable.Map[TopicPartition, LeaderAndIsr]()
-
-    alterPartitionRequest.topics.forEach { topicReq =>
-      topicReq.partitions.forEach { partitionReq =>
-        partitionsToAlter.put(
-          new TopicPartition(topicReq.name, partitionReq.partitionIndex),
-          LeaderAndIsr(
-            alterPartitionRequest.brokerId,
-            partitionReq.leaderEpoch,
-            partitionReq.newIsr().asScala.toList.map(_.toInt),
-            LeaderRecoveryState.of(partitionReq.leaderRecoveryState),
-            partitionReq.partitionEpoch
-          )
-        )
-      }
-    }
-
-    def responseCallback(results: Either[Map[TopicPartition, Either[Errors, 
LeaderAndIsr]], Errors]): Unit = {
-      val resp = new AlterPartitionResponseData()
-      results match {
-        case Right(error) =>
-          resp.setErrorCode(error.code)
-        case Left(partitionResults) =>
-          resp.setTopics(new util.ArrayList())
-          partitionResults
-            .groupBy { case (tp, _) => tp.topic }   // Group by topic
-            .foreach { case (topic, partitions) =>
-              // Add each topic part to the response
-              val topicResp = new AlterPartitionResponseData.TopicData()
-                .setName(topic)
-                .setPartitions(new util.ArrayList())
-              resp.topics.add(topicResp)
-              partitions.foreach { case (tp, errorOrIsr) =>
-                // Add each partition part to the response (new ISR or error)
-                errorOrIsr match {
-                  case Left(error) => topicResp.partitions.add(
-                    new AlterPartitionResponseData.PartitionData()
-                      .setPartitionIndex(tp.partition)
-                      .setErrorCode(error.code))
-                  case Right(leaderAndIsr) =>
-                    /* Setting the LeaderRecoveryState field is always safe 
because it will always be the same
-                     * as the value set in the request. For version 0, that is 
always the default RECOVERED
-                     * which is ignored when serializing to version 0. For any 
other version, the
-                     * LeaderRecoveryState field is supported.
-                     */
-                    topicResp.partitions.add(
-                      new AlterPartitionResponseData.PartitionData()
-                        .setPartitionIndex(tp.partition)
-                        .setLeaderId(leaderAndIsr.leader)
-                        .setLeaderEpoch(leaderAndIsr.leaderEpoch)
-                        .setIsr(leaderAndIsr.isr.map(Integer.valueOf).asJava)
-                        
.setLeaderRecoveryState(leaderAndIsr.leaderRecoveryState.value)
-                        .setPartitionEpoch(leaderAndIsr.partitionEpoch)
-                    )
-                }
-            }
-          }
-      }
-      callback.apply(resp)
-    }
-
-    eventManager.put(
-      AlterPartitionReceived(alterPartitionRequest.brokerId, 
alterPartitionRequest.brokerEpoch, partitionsToAlter, responseCallback)
-    )
+  def alterPartitions(
+    alterPartitionRequest: AlterPartitionRequestData,
+    alterPartitionRequestVersion: Short,
+    callback: AlterPartitionResponseData => Unit
+  ): Unit = {
+    eventManager.put(AlterPartitionReceived(
+      alterPartitionRequest,
+      alterPartitionRequestVersion,
+      callback
+    ))
   }
 
   private def processAlterPartition(
-    brokerId: Int,
-    brokerEpoch: Long,
-    partitionsToAlter: Map[TopicPartition, LeaderAndIsr],
-    callback: AlterPartitionCallback
+    alterPartitionRequest: AlterPartitionRequestData,
+    alterPartitionRequestVersion: Short,
+    callback: AlterPartitionResponseData => Unit
   ): Unit = {
+    val useTopicsIds = alterPartitionRequestVersion > 1
 
     // Handle a few short-circuits
     if (!isActive) {
-      callback.apply(Right(Errors.NOT_CONTROLLER))
+      callback(new 
AlterPartitionResponseData().setErrorCode(Errors.NOT_CONTROLLER.code))
       return
     }
 
+    val brokerId = alterPartitionRequest.brokerId
+    val brokerEpoch = alterPartitionRequest.brokerEpoch
     val brokerEpochOpt = controllerContext.liveBrokerIdAndEpochs.get(brokerId)
     if (brokerEpochOpt.isEmpty) {
       info(s"Ignoring AlterPartition due to unknown broker $brokerId")
-      callback.apply(Right(Errors.STALE_BROKER_EPOCH))
+      callback(new 
AlterPartitionResponseData().setErrorCode(Errors.STALE_BROKER_EPOCH.code))
       return
     }
 
     if (!brokerEpochOpt.contains(brokerEpoch)) {
       info(s"Ignoring AlterPartition due to stale broker epoch $brokerEpoch 
and local broker epoch $brokerEpochOpt for broker $brokerId")
-      callback.apply(Right(Errors.STALE_BROKER_EPOCH))
+      callback(new 
AlterPartitionResponseData().setErrorCode(Errors.STALE_BROKER_EPOCH.code))
       return
     }
 
-    val response = try {
-      val partitionResponses = mutable.HashMap[TopicPartition, Either[Errors, 
LeaderAndIsr]]()
+    val partitionsToAlter = new mutable.HashMap[TopicPartition, LeaderAndIsr]()
+    val alterPartitionResponse = new AlterPartitionResponseData()
 
+    alterPartitionRequest.topics.forEach { topicReq =>
+      val topicNameOpt = if (useTopicsIds) {
+        controllerContext.topicName(topicReq.topicId)
+      } else {
+        Some(topicReq.topicName)
+      }
+
+      topicNameOpt match {
+        case None =>
+          val topicResponse = new AlterPartitionResponseData.TopicData()
+            .setTopicId(topicReq.topicId)
+          alterPartitionResponse.topics.add(topicResponse)
+          topicReq.partitions.forEach { partitionReq =>
+            topicResponse.partitions.add(new 
AlterPartitionResponseData.PartitionData()
+              .setPartitionIndex(partitionReq.partitionIndex)
+              .setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code))

Review Comment:
   Similarly, we should use `UNKNOWN_TOPIC_ID`.



##########
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java:
##########
@@ -1085,22 +1108,36 @@ private Errors validateAlterPartitionData(
         if (leaderRecoveryState == LeaderRecoveryState.RECOVERING && 
newIsr.length > 1) {
             log.info("Rejecting AlterPartition request from node {} for {}-{} 
because " +
                     "the ISR {} had more than one replica while the leader was 
still " +
-                    "recovering from an unlcean leader election {}.",
+                    "recovering from an unclean leader election {}.",
                     brokerId, topic.name, partitionId, partitionData.newIsr(),
                     leaderRecoveryState);
 
             return INVALID_REQUEST;
         }
         if (partition.leaderRecoveryState == LeaderRecoveryState.RECOVERED &&
                 leaderRecoveryState == LeaderRecoveryState.RECOVERING) {
-
             log.info("Rejecting AlterPartition request from node {} for {}-{} 
because " +
                     "the leader recovery state cannot change from RECOVERED to 
RECOVERING.",
                     brokerId, topic.name, partitionId);
 
             return INVALID_REQUEST;
         }
 
+        List<Integer> ineligibleReplicas = partitionData.newIsr().stream()
+            .filter(replica -> !isAcceptableReplica.apply(replica))
+            .collect(Collectors.toList());
+        if (ineligibleReplicas.size() > 0) {

Review Comment:
   nit: `!ineligibleReplicas.isEmpty()`?



##########
core/src/main/scala/kafka/controller/KafkaController.scala:
##########
@@ -2225,194 +2223,210 @@ class KafkaController(val config: KafkaConfig,
     }
   }
 
-  def alterPartitions(alterPartitionRequest: AlterPartitionRequestData, 
callback: AlterPartitionResponseData => Unit): Unit = {
-    val partitionsToAlter = mutable.Map[TopicPartition, LeaderAndIsr]()
-
-    alterPartitionRequest.topics.forEach { topicReq =>
-      topicReq.partitions.forEach { partitionReq =>
-        partitionsToAlter.put(
-          new TopicPartition(topicReq.name, partitionReq.partitionIndex),
-          LeaderAndIsr(
-            alterPartitionRequest.brokerId,
-            partitionReq.leaderEpoch,
-            partitionReq.newIsr().asScala.toList.map(_.toInt),
-            LeaderRecoveryState.of(partitionReq.leaderRecoveryState),
-            partitionReq.partitionEpoch
-          )
-        )
-      }
-    }
-
-    def responseCallback(results: Either[Map[TopicPartition, Either[Errors, 
LeaderAndIsr]], Errors]): Unit = {
-      val resp = new AlterPartitionResponseData()
-      results match {
-        case Right(error) =>
-          resp.setErrorCode(error.code)
-        case Left(partitionResults) =>
-          resp.setTopics(new util.ArrayList())
-          partitionResults
-            .groupBy { case (tp, _) => tp.topic }   // Group by topic
-            .foreach { case (topic, partitions) =>
-              // Add each topic part to the response
-              val topicResp = new AlterPartitionResponseData.TopicData()
-                .setName(topic)
-                .setPartitions(new util.ArrayList())
-              resp.topics.add(topicResp)
-              partitions.foreach { case (tp, errorOrIsr) =>
-                // Add each partition part to the response (new ISR or error)
-                errorOrIsr match {
-                  case Left(error) => topicResp.partitions.add(
-                    new AlterPartitionResponseData.PartitionData()
-                      .setPartitionIndex(tp.partition)
-                      .setErrorCode(error.code))
-                  case Right(leaderAndIsr) =>
-                    /* Setting the LeaderRecoveryState field is always safe 
because it will always be the same
-                     * as the value set in the request. For version 0, that is 
always the default RECOVERED
-                     * which is ignored when serializing to version 0. For any 
other version, the
-                     * LeaderRecoveryState field is supported.
-                     */
-                    topicResp.partitions.add(
-                      new AlterPartitionResponseData.PartitionData()
-                        .setPartitionIndex(tp.partition)
-                        .setLeaderId(leaderAndIsr.leader)
-                        .setLeaderEpoch(leaderAndIsr.leaderEpoch)
-                        .setIsr(leaderAndIsr.isr.map(Integer.valueOf).asJava)
-                        
.setLeaderRecoveryState(leaderAndIsr.leaderRecoveryState.value)
-                        .setPartitionEpoch(leaderAndIsr.partitionEpoch)
-                    )
-                }
-            }
-          }
-      }
-      callback.apply(resp)
-    }
-
-    eventManager.put(
-      AlterPartitionReceived(alterPartitionRequest.brokerId, 
alterPartitionRequest.brokerEpoch, partitionsToAlter, responseCallback)
-    )
+  def alterPartitions(

Review Comment:
   Just checking my understanding. It looks like we have not modified this 
logic to use `INELIGIBLE_REPLICA`. Is that right? Should we?



##########
core/src/main/scala/kafka/cluster/Partition.scala:
##########
@@ -1588,6 +1635,10 @@ class Partition(val topicPartition: TopicPartition,
       case Errors.INVALID_REQUEST =>
         debug(s"Failed to alter partition to $proposedIsrState because the 
request is invalid. Giving up.")
         false
+      case Errors.NEW_LEADER_ELECTED =>
+        // This is only raised in KRaft mode.
+        debug(s"ISR updated to ${partitionState.isr.mkString(",")} but this 
broker is not longer the leader.")

Review Comment:
   I wonder if the comment is accurate. I think this is handling the situation 
where the current leader gets kicked out of the replica set and the ISR after a 
reassignment completes? So we don't really know what the new ISR is.
   
   I've always had misgivings about the way this is handled by the way, but I 
haven't thought of a specific problem. The now-defunct leader will continue 
acting as the leader and can even commit new data as long as all the members of 
`maximalIsr` continue fetching from it. It might be worth adding a comment 
explaining the expected behavior here. For example, it would be a bug to reset 
to `partitionStateToRollBackTo` after receiving this error.



##########
core/src/main/scala/kafka/cluster/Partition.scala:
##########
@@ -1625,8 +1676,8 @@ class Partition(val topicPartition: TopicPartition,
       info(s"ISR updated to ${partitionState.isr.mkString(",")} and version 
updated to $partitionEpoch")
 
       proposedIsrState match {
-        case PendingExpandIsr(_, _, _) => 
alterPartitionListener.markIsrExpand()
-        case PendingShrinkIsr(_, _, _) => 
alterPartitionListener.markIsrShrink()
+        case PendingExpandIsr(_, _, _, _) => 
alterPartitionListener.markIsrExpand()

Review Comment:
   We could probably invert this and avoid the match.
   
   ```scala
   sealed trait PartitionState {
     def notifyListener(alterPartitionListener)
   }
   
   case class PendingExpand {
     def notifyListener(alterPartitionListener) = 
alterPartitionListener.markIsrExpand()
   }
   ```
   
   



##########
core/src/main/scala/kafka/server/AlterPartitionManager.scala:
##########
@@ -217,36 +225,51 @@ class DefaultAlterPartitionManager(
       })
   }
 
-  private def buildRequest(inflightAlterPartitionItems: 
Seq[AlterPartitionItem]): AlterPartitionRequestData = {
+  private def buildRequest(
+    inflightAlterPartitionItems: Seq[AlterPartitionItem],
+  ): (AlterPartitionRequestData, Boolean, mutable.Map[Uuid, String]) = {
+    val metadataVersion = metadataVersionSupplier()
+    var canUseTopicIds = metadataVersion.isAtLeast(MetadataVersion.IBP_2_8_IV0)
+    val topicNamesByIds = mutable.HashMap[Uuid, String]()
+
     val message = new AlterPartitionRequestData()
       .setBrokerId(brokerId)
-      .setBrokerEpoch(brokerEpochSupplier.apply())
+      .setBrokerEpoch(brokerEpochSupplier())
+
+    inflightAlterPartitionItems.groupBy(_.topicIdPartition.topic).foreach { 
case (topicName, items) =>
+      val topicId = items.head.topicIdPartition.topicId
+      // We use topic ids only if all the topics have one defined.
+      canUseTopicIds &= topicId != Uuid.ZERO_UUID
+      topicNamesByIds(topicId) = topicName
 
-      inflightAlterPartitionItems.groupBy(_.topicPartition.topic).foreach { 
case (topic, items) =>
       val topicData = new AlterPartitionRequestData.TopicData()
-        .setName(topic)
+        .setTopicName(topicName)
+        .setTopicId(topicId)

Review Comment:
   Not really an issue, but I find it a little strange that we set `topicId` 
even when `canUseTopicIds` is false. Similarly, we can skip updating 
`topicNamesByIds` when `canUseTopicIds` is false.



##########
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java:
##########
@@ -988,9 +1006,11 @@ ControllerResult<AlterPartitionResponseData> 
alterPartition(AlterPartitionReques
                             "the ongoing partition reassignment and triggered 
a " +
                             "leadership change. Returning 
FENCED_LEADER_EPOCH.",
                             request.brokerId(), topic.name, partitionId);
+                        Errors error = 
context.requestHeader().requestApiVersion() > 1 ?

Review Comment:
   nit: we need to update the comment above



##########
core/src/main/scala/kafka/controller/KafkaController.scala:
##########
@@ -2225,194 +2223,210 @@ class KafkaController(val config: KafkaConfig,
     }
   }
 
-  def alterPartitions(alterPartitionRequest: AlterPartitionRequestData, 
callback: AlterPartitionResponseData => Unit): Unit = {
-    val partitionsToAlter = mutable.Map[TopicPartition, LeaderAndIsr]()
-
-    alterPartitionRequest.topics.forEach { topicReq =>
-      topicReq.partitions.forEach { partitionReq =>
-        partitionsToAlter.put(
-          new TopicPartition(topicReq.name, partitionReq.partitionIndex),
-          LeaderAndIsr(
-            alterPartitionRequest.brokerId,
-            partitionReq.leaderEpoch,
-            partitionReq.newIsr().asScala.toList.map(_.toInt),
-            LeaderRecoveryState.of(partitionReq.leaderRecoveryState),
-            partitionReq.partitionEpoch
-          )
-        )
-      }
-    }
-
-    def responseCallback(results: Either[Map[TopicPartition, Either[Errors, 
LeaderAndIsr]], Errors]): Unit = {
-      val resp = new AlterPartitionResponseData()
-      results match {
-        case Right(error) =>
-          resp.setErrorCode(error.code)
-        case Left(partitionResults) =>
-          resp.setTopics(new util.ArrayList())
-          partitionResults
-            .groupBy { case (tp, _) => tp.topic }   // Group by topic
-            .foreach { case (topic, partitions) =>
-              // Add each topic part to the response
-              val topicResp = new AlterPartitionResponseData.TopicData()
-                .setName(topic)
-                .setPartitions(new util.ArrayList())
-              resp.topics.add(topicResp)
-              partitions.foreach { case (tp, errorOrIsr) =>
-                // Add each partition part to the response (new ISR or error)
-                errorOrIsr match {
-                  case Left(error) => topicResp.partitions.add(
-                    new AlterPartitionResponseData.PartitionData()
-                      .setPartitionIndex(tp.partition)
-                      .setErrorCode(error.code))
-                  case Right(leaderAndIsr) =>
-                    /* Setting the LeaderRecoveryState field is always safe 
because it will always be the same
-                     * as the value set in the request. For version 0, that is 
always the default RECOVERED
-                     * which is ignored when serializing to version 0. For any 
other version, the
-                     * LeaderRecoveryState field is supported.
-                     */
-                    topicResp.partitions.add(
-                      new AlterPartitionResponseData.PartitionData()
-                        .setPartitionIndex(tp.partition)
-                        .setLeaderId(leaderAndIsr.leader)
-                        .setLeaderEpoch(leaderAndIsr.leaderEpoch)
-                        .setIsr(leaderAndIsr.isr.map(Integer.valueOf).asJava)
-                        
.setLeaderRecoveryState(leaderAndIsr.leaderRecoveryState.value)
-                        .setPartitionEpoch(leaderAndIsr.partitionEpoch)
-                    )
-                }
-            }
-          }
-      }
-      callback.apply(resp)
-    }
-
-    eventManager.put(
-      AlterPartitionReceived(alterPartitionRequest.brokerId, 
alterPartitionRequest.brokerEpoch, partitionsToAlter, responseCallback)
-    )
+  def alterPartitions(
+    alterPartitionRequest: AlterPartitionRequestData,
+    alterPartitionRequestVersion: Short,
+    callback: AlterPartitionResponseData => Unit
+  ): Unit = {
+    eventManager.put(AlterPartitionReceived(
+      alterPartitionRequest,
+      alterPartitionRequestVersion,
+      callback
+    ))
   }
 
   private def processAlterPartition(
-    brokerId: Int,
-    brokerEpoch: Long,
-    partitionsToAlter: Map[TopicPartition, LeaderAndIsr],
-    callback: AlterPartitionCallback
+    alterPartitionRequest: AlterPartitionRequestData,
+    alterPartitionRequestVersion: Short,
+    callback: AlterPartitionResponseData => Unit
   ): Unit = {
+    val useTopicsIds = alterPartitionRequestVersion > 1
 
     // Handle a few short-circuits
     if (!isActive) {
-      callback.apply(Right(Errors.NOT_CONTROLLER))
+      callback(new 
AlterPartitionResponseData().setErrorCode(Errors.NOT_CONTROLLER.code))
       return
     }
 
+    val brokerId = alterPartitionRequest.brokerId
+    val brokerEpoch = alterPartitionRequest.brokerEpoch
     val brokerEpochOpt = controllerContext.liveBrokerIdAndEpochs.get(brokerId)
     if (brokerEpochOpt.isEmpty) {
       info(s"Ignoring AlterPartition due to unknown broker $brokerId")
-      callback.apply(Right(Errors.STALE_BROKER_EPOCH))
+      callback(new 
AlterPartitionResponseData().setErrorCode(Errors.STALE_BROKER_EPOCH.code))
       return
     }
 
     if (!brokerEpochOpt.contains(brokerEpoch)) {
       info(s"Ignoring AlterPartition due to stale broker epoch $brokerEpoch 
and local broker epoch $brokerEpochOpt for broker $brokerId")
-      callback.apply(Right(Errors.STALE_BROKER_EPOCH))
+      callback(new 
AlterPartitionResponseData().setErrorCode(Errors.STALE_BROKER_EPOCH.code))
       return
     }
 
-    val response = try {
-      val partitionResponses = mutable.HashMap[TopicPartition, Either[Errors, 
LeaderAndIsr]]()
+    val partitionsToAlter = new mutable.HashMap[TopicPartition, LeaderAndIsr]()
+    val alterPartitionResponse = new AlterPartitionResponseData()
 
+    alterPartitionRequest.topics.forEach { topicReq =>
+      val topicNameOpt = if (useTopicsIds) {
+        controllerContext.topicName(topicReq.topicId)
+      } else {
+        Some(topicReq.topicName)
+      }
+
+      topicNameOpt match {
+        case None =>
+          val topicResponse = new AlterPartitionResponseData.TopicData()
+            .setTopicId(topicReq.topicId)
+          alterPartitionResponse.topics.add(topicResponse)
+          topicReq.partitions.forEach { partitionReq =>
+            topicResponse.partitions.add(new 
AlterPartitionResponseData.PartitionData()
+              .setPartitionIndex(partitionReq.partitionIndex)
+              .setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code))
+          }
+
+        case Some(topicName) =>
+          topicReq.partitions.forEach { partitionReq =>
+            partitionsToAlter.put(
+              new TopicPartition(topicName, partitionReq.partitionIndex),
+              LeaderAndIsr(
+                alterPartitionRequest.brokerId,
+                partitionReq.leaderEpoch,
+                partitionReq.newIsr.asScala.toList.map(_.toInt),
+                LeaderRecoveryState.of(partitionReq.leaderRecoveryState),

Review Comment:
   Now that we're doing some of the logic in the event handler, I think we need 
to be careful with exceptions. For example, if we get an invalid 
`leaderRecoveryState`, this would throw. But now it would be caught in 
`ControllerEventManager` instead of `KafakApis`, so there would be no logic to 
complete the request.



##########
core/src/main/scala/kafka/server/AlterPartitionManager.scala:
##########
@@ -263,7 +286,11 @@ class DefaultAlterPartitionManager(
         val partitionResponses = new mutable.HashMap[TopicPartition, 
Either[Errors, LeaderAndIsr]]()
         data.topics.forEach { topic =>
           topic.partitions.forEach { partition =>
-            val tp = new TopicPartition(topic.name, partition.partitionIndex)
+            val tp = new TopicPartition(
+              topicNamesByIds.getOrElse(topic.topicId, topic.topicName),

Review Comment:
   I guess we could get this from the MetadataCache as well. Also, I wonder if 
we should validate `topicName` is non-empty before using it?



##########
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java:
##########
@@ -934,12 +943,21 @@ ControllerResult<AlterPartitionResponseData> 
alterPartition(AlterPartitionReques
                 continue;
             }
 
+            BrokerHeartbeatManager heartbeatManager = 
clusterControl.heartbeatManager();

Review Comment:
   nit: seems unused?



##########
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java:
##########
@@ -956,7 +974,7 @@ ControllerResult<AlterPartitionResponseData> 
alterPartition(AlterPartitionReques
                     partitionId,
                     clusterControl::active,
                     
featureControl.metadataVersion().isLeaderRecoverySupported());
-                if 
(configurationControl.uncleanLeaderElectionEnabledForTopic(topicData.name())) {
+                if 
(configurationControl.uncleanLeaderElectionEnabledForTopic(topicData.topicName()))
 {

Review Comment:
   I might have missed it, but do we set the topic name somewhere if the 
request was passed the TopicId instead?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to