dajac commented on code in PR #12181:
URL: https://github.com/apache/kafka/pull/12181#discussion_r892262366


##########
core/src/main/scala/kafka/server/AlterPartitionManager.scala:
##########
@@ -217,36 +225,51 @@ class DefaultAlterPartitionManager(
       })
   }
 
-  private def buildRequest(inflightAlterPartitionItems: 
Seq[AlterPartitionItem]): AlterPartitionRequestData = {
+  private def buildRequest(
+    inflightAlterPartitionItems: Seq[AlterPartitionItem],
+  ): (AlterPartitionRequestData, Boolean, mutable.Map[Uuid, String]) = {
+    val metadataVersion = metadataVersionSupplier()
+    var canUseTopicIds = metadataVersion.isAtLeast(MetadataVersion.IBP_2_8_IV0)
+    val topicNamesByIds = mutable.HashMap[Uuid, String]()
+
     val message = new AlterPartitionRequestData()
       .setBrokerId(brokerId)
-      .setBrokerEpoch(brokerEpochSupplier.apply())
+      .setBrokerEpoch(brokerEpochSupplier())
+
+    inflightAlterPartitionItems.groupBy(_.topicIdPartition.topic).foreach { 
case (topicName, items) =>
+      val topicId = items.head.topicIdPartition.topicId
+      // We use topic ids only if all the topics have one defined.
+      canUseTopicIds &= topicId != Uuid.ZERO_UUID
+      topicNamesByIds(topicId) = topicName
 
-      inflightAlterPartitionItems.groupBy(_.topicPartition.topic).foreach { 
case (topic, items) =>
       val topicData = new AlterPartitionRequestData.TopicData()
-        .setName(topic)
+        .setTopicName(topicName)
+        .setTopicId(topicId)

Review Comment:
   Uniquely relying on `canUseTopicIds` to decide if we can use topic id or not 
is not enough. The final decision is made when the request is build based on 
the api version advertised by the controller. This is why I set both the name 
and the topic id all the time here.
   
   `canUseTopicIds` basically tells us if this broker could use them - this 
assumes that the right IBP and that all pending partition changes have a topic 
id as well - but does not guarantee that they will be used.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to