soarez commented on code in PR #14863:
URL: https://github.com/apache/kafka/pull/14863#discussion_r1416477136


##########
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java:
##########
@@ -2019,6 +2021,70 @@ ListPartitionReassignmentsResponseData 
listPartitionReassignments(
         return response;
     }
 
+    ControllerResult<AssignReplicasToDirsResponseData> 
handleAssignReplicasToDirs(AssignReplicasToDirsRequestData request) {
+        int brokerId = request.brokerId();
+        clusterControl.checkBrokerEpoch(brokerId, request.brokerEpoch());
+        BrokerRegistration brokerRegistration = 
clusterControl.brokerRegistrations().get(brokerId);
+        List<ApiMessageAndVersion> records = new ArrayList<>();
+        AssignReplicasToDirsResponseData response = new 
AssignReplicasToDirsResponseData();
+        Set<TopicIdPartition> leaderAndIsrUpdates = new HashSet<>();
+        for (AssignReplicasToDirsRequestData.DirectoryData reqDir : 
request.directories()) {
+            Uuid dirId = reqDir.id();
+            AssignReplicasToDirsResponseData.DirectoryData resDir = new 
AssignReplicasToDirsResponseData.DirectoryData().setId(dirId);
+            for (AssignReplicasToDirsRequestData.TopicData reqTopic : 
reqDir.topics()) {
+                Uuid topicId = reqTopic.topicId();
+                Errors topicError = Errors.NONE;
+                TopicControlInfo topicControl = this.topics.get(topicId);
+                if (topicControl == null) {
+                    log.warn("AssignReplicasToDirsRequest from broker {} 
references unknown topic ID {}", brokerId, topicId);
+                    topicError = Errors.UNKNOWN_TOPIC_ID;
+                }
+                AssignReplicasToDirsResponseData.TopicData resTopic = new 
AssignReplicasToDirsResponseData.TopicData().setTopicId(topicId);
+                for (AssignReplicasToDirsRequestData.PartitionData 
reqPartition : reqTopic.partitions()) {
+                    int partitionIndex = reqPartition.partitionIndex();
+                    Errors partitionError = topicError;
+                    if (topicError == Errors.NONE) {
+                        String topicName = topicControl.name;
+                        PartitionRegistration partitionRegistration = 
topicControl.parts.get(partitionIndex);
+                        if (partitionRegistration == null) {
+                            log.warn("AssignReplicasToDirsRequest from broker 
{} references unknown partition {}-{}", brokerId, topicName, partitionIndex);
+                            partitionError = Errors.UNKNOWN_TOPIC_OR_PARTITION;
+                        } else if 
(!Replicas.contains(partitionRegistration.replicas, brokerId)) {
+                            log.warn("AssignReplicasToDirsRequest from broker 
{} references non assigned partition {}-{}", brokerId, topicName, 
partitionIndex);
+                            partitionError = Errors.NOT_LEADER_OR_FOLLOWER;
+                        } else {
+                            Optional<ApiMessageAndVersion> 
partitionChangeRecord = new PartitionChangeBuilder(
+                                    partitionRegistration,
+                                    topicId,
+                                    partitionIndex,
+                                    new LeaderAcceptor(clusterControl, 
partitionRegistration),
+                                    featureControl.metadataVersion(),
+                                    getTopicEffectiveMinIsr(topicName)
+                            )
+                                    .setDirectory(brokerId, dirId)
+                                    .setDefaultDirProvider(clusterDescriber)
+                                    .build();
+                            partitionChangeRecord.ifPresent(records::add);
+
+                            if (!brokerRegistration.hasOnlineDir(dirId)) {

Review Comment:
   Good point



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to