hachikuji commented on code in PR #12181:
URL: https://github.com/apache/kafka/pull/12181#discussion_r892926556
##########
core/src/main/scala/kafka/server/AlterPartitionManager.scala:
##########
@@ -217,36 +225,51 @@ class DefaultAlterPartitionManager(
})
}
- private def buildRequest(inflightAlterPartitionItems:
Seq[AlterPartitionItem]): AlterPartitionRequestData = {
+ private def buildRequest(
+ inflightAlterPartitionItems: Seq[AlterPartitionItem],
+ ): (AlterPartitionRequestData, Boolean, mutable.Map[Uuid, String]) = {
+ val metadataVersion = metadataVersionSupplier()
+ var canUseTopicIds = metadataVersion.isAtLeast(MetadataVersion.IBP_2_8_IV0)
+ val topicNamesByIds = mutable.HashMap[Uuid, String]()
+
val message = new AlterPartitionRequestData()
.setBrokerId(brokerId)
- .setBrokerEpoch(brokerEpochSupplier.apply())
+ .setBrokerEpoch(brokerEpochSupplier())
+
+ inflightAlterPartitionItems.groupBy(_.topicIdPartition.topic).foreach {
case (topicName, items) =>
+ val topicId = items.head.topicIdPartition.topicId
+ // We use topic ids only if all the topics have one defined.
+ canUseTopicIds &= topicId != Uuid.ZERO_UUID
+ topicNamesByIds(topicId) = topicName
- inflightAlterPartitionItems.groupBy(_.topicPartition.topic).foreach {
case (topic, items) =>
val topicData = new AlterPartitionRequestData.TopicData()
- .setName(topic)
+ .setTopicName(topicName)
+ .setTopicId(topicId)
Review Comment:
But if `canUseTopicIds` is false, then certainly we won't use them, right?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]