soarez commented on code in PR #14863: URL: https://github.com/apache/kafka/pull/14863#discussion_r1416477136
########## metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java: ########## @@ -2019,6 +2021,70 @@ ListPartitionReassignmentsResponseData listPartitionReassignments( return response; } + ControllerResult<AssignReplicasToDirsResponseData> handleAssignReplicasToDirs(AssignReplicasToDirsRequestData request) { + int brokerId = request.brokerId(); + clusterControl.checkBrokerEpoch(brokerId, request.brokerEpoch()); + BrokerRegistration brokerRegistration = clusterControl.brokerRegistrations().get(brokerId); + List<ApiMessageAndVersion> records = new ArrayList<>(); + AssignReplicasToDirsResponseData response = new AssignReplicasToDirsResponseData(); + Set<TopicIdPartition> leaderAndIsrUpdates = new HashSet<>(); + for (AssignReplicasToDirsRequestData.DirectoryData reqDir : request.directories()) { + Uuid dirId = reqDir.id(); + AssignReplicasToDirsResponseData.DirectoryData resDir = new AssignReplicasToDirsResponseData.DirectoryData().setId(dirId); + for (AssignReplicasToDirsRequestData.TopicData reqTopic : reqDir.topics()) { + Uuid topicId = reqTopic.topicId(); + Errors topicError = Errors.NONE; + TopicControlInfo topicControl = this.topics.get(topicId); + if (topicControl == null) { + log.warn("AssignReplicasToDirsRequest from broker {} references unknown topic ID {}", brokerId, topicId); + topicError = Errors.UNKNOWN_TOPIC_ID; + } + AssignReplicasToDirsResponseData.TopicData resTopic = new AssignReplicasToDirsResponseData.TopicData().setTopicId(topicId); + for (AssignReplicasToDirsRequestData.PartitionData reqPartition : reqTopic.partitions()) { + int partitionIndex = reqPartition.partitionIndex(); + Errors partitionError = topicError; + if (topicError == Errors.NONE) { + String topicName = topicControl.name; + PartitionRegistration partitionRegistration = topicControl.parts.get(partitionIndex); + if (partitionRegistration == null) { + log.warn("AssignReplicasToDirsRequest from broker {} references unknown partition {}-{}", brokerId, topicName, partitionIndex); + partitionError = Errors.UNKNOWN_TOPIC_OR_PARTITION; + } else if (!Replicas.contains(partitionRegistration.replicas, brokerId)) { + log.warn("AssignReplicasToDirsRequest from broker {} references non assigned partition {}-{}", brokerId, topicName, partitionIndex); + partitionError = Errors.NOT_LEADER_OR_FOLLOWER; + } else { + Optional<ApiMessageAndVersion> partitionChangeRecord = new PartitionChangeBuilder( + partitionRegistration, + topicId, + partitionIndex, + new LeaderAcceptor(clusterControl, partitionRegistration), + featureControl.metadataVersion(), + getTopicEffectiveMinIsr(topicName) + ) + .setDirectory(brokerId, dirId) + .setDefaultDirProvider(clusterDescriber) + .build(); + partitionChangeRecord.ifPresent(records::add); + + if (!brokerRegistration.hasOnlineDir(dirId)) { Review Comment: Good point -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org